Source code for minitrino.core.exec.utils

"""Command execution utilities for Minitrino clusters."""

from __future__ import annotations

import time
from typing import TYPE_CHECKING

from minitrino.core.cluster.resource import MinitrinoContainer
from minitrino.core.errors import MinitrinoError

if TYPE_CHECKING:
    from minitrino.core.context import MinitrinoContext


[docs] def detect_container_shell( ctx: MinitrinoContext, container: MinitrinoContainer | str, user: str = "root" ) -> str: """Detect the shell in the container. Waits up to 10 seconds for the container to accept commands. Parameters ---------- ctx : MinitrinoContext The Minitrino context. container : MinitrinoContainer | str The container (or fully qualified container name) to detect the shell for. user : str, optional The user to execute shell detection as (default is "root"). Raises ------ MinitrinoError If the container does not accept commands within 10 seconds, or no shell is found. """ timeout = 10.0 poll_interval = 0.2 start = time.monotonic() last_e: Exception | None = None fqcn = container if isinstance(container, str) else container.name while time.monotonic() - start < timeout: try: container_obj = _get_container(ctx, container) except Exception as e: last_e = e time.sleep(poll_interval) continue if container_obj.status != "running": time.sleep(poll_interval) continue try: return _check_shell(ctx, container_obj, user) except Exception as e: last_e = e time.sleep(poll_interval) continue raise MinitrinoError( f"Container '{fqcn}' could not accept commands or does not have a shell " f"installed within {timeout:.1f} seconds." ) from last_e
def _get_container( ctx: MinitrinoContext, container: MinitrinoContainer | str ) -> MinitrinoContainer: """Get the container object.""" if isinstance(container, str): container_obj = ctx.cluster.resource.container(container) else: container_obj = ctx.cluster.resource.container(container.name) return container_obj def _check_shell( ctx: MinitrinoContext, container: MinitrinoContainer, user: str ) -> str: """Check for and return a working shell in the container.""" last_e: Exception shells = [ "bash", "/bin/bash", "/usr/bin/bash", "sh", "/bin/sh", "/usr/bin/sh", ] for shell in shells: try: ctx.logger.debug( f"Checking for shell '{shell}' in container '{container.name}'" ) result = container.exec_run([shell, "-c", "echo ok"], user=user) if result.exit_code == 0 and b"ok" in result.output: return shell raise MinitrinoError( f"Shell '{shell}' does not exist or is not executable. " f"Command output: {result.output.decode('utf-8')}", ) except Exception as e: last_e = e continue raise last_e