"""Utility functions for Minitrino CLI and core operations."""
from __future__ import annotations
import difflib
import logging
import os
import sys
import traceback
from functools import wraps
from importlib.metadata import version
from inspect import signature
from typing import TYPE_CHECKING, Any
import docker
from click import echo, make_pass_decorator
from docker.models.containers import Container
from minitrino.core.docker.socket import resolve_docker_socket
from minitrino.core.docker.wrappers import MinitrinoContainer
from minitrino.core.errors import MinitrinoError, UserError
from minitrino.core.logging.levels import LogLevel
from minitrino.core.logging.utils import configure_logging
from minitrino.shutdown import shutdown_event
if TYPE_CHECKING:
from minitrino.core.context import MinitrinoContext
# ----------------------------------------------------------------------
# CLI Decorators & Exception Handling
# ----------------------------------------------------------------------
[docs]
def pass_environment() -> Any:
"""Return a Click pass decorator for the MinitrinoContext.
Returns
-------
Any
A decorator that passes the MinitrinoContext instance.
"""
from minitrino.core.context import MinitrinoContext
return make_pass_decorator(MinitrinoContext, ensure=True)
[docs]
def handle_exception(
error: BaseException,
ctx: Any | None = None,
additional_msg: str = "",
skip_traceback: bool = False,
) -> None:
"""Handle a single exception.
Parameters
----------
error : BaseException
The exception object.
ctx : Optional[Any]
Optional CLI context object with logger.
additional_msg : str
Additional message to log, if any.
skip_traceback : bool
If True, suppresses traceback output unless overridden by error
type.
Raises
------
SystemExit
Exits the program with the appropriate exit code.
"""
# Set the shutdown event to signal to any running threads to exit
shutdown_event.set()
if isinstance(error, UserError):
error_msg = error.msg
exit_code = error.exit_code
skip_traceback = True
elif isinstance(error, MinitrinoError):
error_msg = error.msg
exit_code = error.exit_code
else:
error_msg = str(error)
exit_code = 1
tb = error.__traceback__
while tb and tb.tb_next:
tb = tb.tb_next
if tb:
frame = tb.tb_frame
filename = os.path.basename(frame.f_code.co_filename)
lineno = tb.tb_lineno
module = frame.f_globals.get("__name__", "")
origin = f"{module}:{filename}:{lineno}"
else:
origin = "unknown:unknown:0"
logger = getattr(ctx, "logger", logging.getLogger("minitrino"))
logger.error(f"[Origin: {origin}]{additional_msg} {error_msg}")
if not skip_traceback:
echo() # Force a newline
echo(f"{traceback.format_exc()}", err=True)
else:
sys.tracebacklimit = 0
sys.exit(exit_code)
[docs]
def exception_handler(func: Any) -> Any:
"""Handle unhandled exceptions.
Parameters
----------
func : Callable
The function to wrap.
Returns
-------
Callable
The wrapped function with exception handling.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
sig = signature(func)
ctx = None
if "ctx" in sig.parameters:
ctx = kwargs.get("ctx")
if ctx is None:
try:
ctx_index = list(sig.parameters).index("ctx")
if len(args) > ctx_index:
ctx = args[ctx_index]
except Exception:
ctx = None
try:
return func(*args, **kwargs)
except Exception as e:
handle_exception(e, ctx)
return wrapper
# ----------------------------------------------------------------------
# Docker/Container Utilities
# ----------------------------------------------------------------------
[docs]
def check_daemon(docker_client: Any) -> None:
"""Check if the Docker daemon is running.
Parameters
----------
docker_client : Any
Docker client instance.
Raises
------
UserError
If the Docker daemon is not running or cannot be pinged.
"""
try:
docker.DockerClient(base_url=resolve_docker_socket()).ping()
except Exception as e:
raise UserError(
f"Error when pinging the Docker server. Is the Docker daemon running?\n"
f"Error from Docker: {str(e)}",
"You may need to initialize your Docker daemon. If Docker is already "
"running, check whether you are using the intended Docker context "
"(e.g. Colima or OrbStack). You can view existing contexts with `docker "
"context ls` and switch with `docker context use <context>`.",
) from e
[docs]
def check_lib(ctx: MinitrinoContext) -> None:
"""Check if a Minitrino library exists.
Parameters
----------
ctx : MinitrinoContext
Context object containing library directory information.
"""
if not ctx:
raise ValueError("MinitrinoContext must be provided for library version check")
if not ctx.lib_dir:
ctx.library_manager.auto_install_or_update()
[docs]
def container_user_and_id(
ctx: MinitrinoContext | None = None,
container: Container | MinitrinoContainer | str = "",
) -> tuple[str, str]:
"""Return the build user and build user ID for a cluster container.
Parameters
----------
ctx : MinitrinoContext
Context object containing cluster information.
container : Container | MinitrinoContainer | str
Container object or container name.
Returns
-------
tuple[str, str]
Tuple of build user and build user ID.
Raises
------
ValueError
If container is not provided.
Notes
-----
Commands executed in coordinator/worker containers tend to rely on
environment variables set during the build process. This function
returns the build user and build user ID for a cluster container,
which can then be used to execute commands in the container with the
correct UID to ensure environment variables resolve correctly.
Examples
--------
>>> _, uid = container_user_and_id("minintrino-default")
>>> cmd = ["cat /etc/${CLUSTER_DIST}/config.properties"]
>>> cmd_executor.execute(cmd, container="minintrino-default", user=uid)
"""
if not ctx: # External call site, e.g. pytest
from minitrino.core.context import MinitrinoContext
ctx = MinitrinoContext()
configure_logging(LogLevel.DEBUG)
ctx.initialize()
if not container:
raise MinitrinoError("Container object or container name must be provided")
if isinstance(container, str):
container = ctx.cluster.resource.container(container)
usr = ctx.cmd_executor.execute(["echo ${SERVICE_USER}"], container=container)[
0
].output.strip()
uid = ctx.cmd_executor.execute([f"id -u {usr}"], container=container)[
0
].output.strip()
return usr, uid
# ----------------------------------------------------------------------
# Miscellaneous
# ----------------------------------------------------------------------
[docs]
def generate_identifier(identifiers: dict[str, Any] | None = None) -> str:
"""Return an object identifier string used for creating log messages.
Parameters
----------
identifiers : Optional[Dict[str, Any]], optional
Dictionary of "identifier_key": "identifier_value" pairs, by
default None.
Returns
-------
str
Formatted string with identifiers enclosed in brackets.
Examples
--------
>>> generate_identifier({"cluster": "default", "module": "test"})
>>> '[cluster: default] [module: test]'
"""
if identifiers is None:
identifiers = {}
identifier = []
for key, value in identifiers.items():
identifier.append(f"[{key}: {value}]")
return " ".join(identifier)
# ----------------------------------------------------------------------
# Parsing & Validation Utilities
# ----------------------------------------------------------------------
[docs]
def parse_key_value_pair(
ctx: MinitrinoContext, pair: str, hard_fail: bool = False
) -> tuple[str, str]:
"""Parse a key-value pair from a string.
Parameters
----------
pair : str
Key-value pair to parse.
hard_fail : bool, optional
Whether to raise an error if the key-value pair is invalid,
by default `False`.
Returns
-------
tuple[str, str]
Tuple of key and value.
"""
pair = pair.strip()
if "=" not in pair and hard_fail:
raise UserError(f"Invalid key-value pair: {pair}")
key, value = pair.split("=", 1)
if (not key or not value) and hard_fail:
raise UserError(f"Invalid key-value pair: {pair}")
return key, value
[docs]
def closest_match_or_error(
name: str, valid_names: list[str], context: str = "item"
) -> str:
"""Return the name or fail with a closest match suggestion.
Parameters
----------
name : str
The user-provided name to validate.
valid_names : list[str]
List of valid names to check against.
context : str, optional
Context string for error message (default: "item").
Returns
-------
str
The valid name (if found).
Raises
------
UserError
If the name is not valid, with a suggestion if available.
Examples
--------
>>> closest_match_or_error('ressources', ['resources', 'remove'])
UserError: Item 'ressources' not found. Did you mean 'resources'?
"""
if name in valid_names:
return name
suggestion = difflib.get_close_matches(name, valid_names, n=1)
suggestion_msg = f" Did you mean '{suggestion[0]}'?" if suggestion else ""
raise UserError(f"{context.capitalize()} '{name}' not found.{suggestion_msg}")
[docs]
def validate_yes(value: str) -> bool:
"""Validate if the input is an affirmative response.
Parameters
----------
value : str
Value to validate.
Returns
-------
bool
`True` if the input is 'y' or 'yes' (case-insensitive), `False`
otherwise.
"""
response = value.replace(" ", "")
return bool(response.lower() == "y" or response.lower() == "yes")
# ----------------------------------------------------------------------
# Version Helpers
# ----------------------------------------------------------------------
[docs]
def cli_ver() -> str:
"""Return the CLI version.
Returns
-------
str
CLI version.
"""
return version("Minitrino")
[docs]
def lib_ver(ctx: MinitrinoContext | None = None, lib_path: str = "") -> str:
"""Return the library version.
Returns
-------
str
Library version.
"""
if ctx is None and not lib_path:
raise MinitrinoError("lib_path must be provided if ctx is None")
if ctx is not None and not lib_path:
lib_path = ctx.lib_dir
version_file = os.path.join(lib_path, "version")
try:
with open(version_file) as f:
return next((line.strip() for line in f if line.strip()), "NOT INSTALLED")
except Exception:
return "NOT INSTALLED"