Source code for minitrino.core.exec.cmd

"""Command execution utilities for Minitrino clusters."""

from __future__ import annotations

import threading
from collections.abc import Callable, Iterator
from typing import TYPE_CHECKING, Any

from minitrino.core.errors import MinitrinoError
from minitrino.core.exec.container import ContainerCommandExecutor
from minitrino.core.exec.host import HostCommandExecutor
from minitrino.core.exec.result import CommandResult

if TYPE_CHECKING:
    from minitrino.core.context import MinitrinoContext


[docs] class CommandExecutor: """Execute commands in a subprocess or within a Docker container. This is a thin dispatcher that delegates to HostCommandExecutor or ContainerCommandExecutor based on the presence of the 'container' kwarg. """ def __init__(self, ctx: MinitrinoContext) -> None: self._ctx = ctx
[docs] def execute( self, *args: list[str], **kwargs: Any, ) -> list[CommandResult]: """Execute commands in a subprocess or within a container. Keyword Arguments ----------------- interactive : bool If True, runs the command in interactive mode. container : MinitrinoContainer The container to run the command in. If not provided, the command will be run on the host via HostCommandExecutor. user : str The user to run the command as. If not provided, the command will be run as root. environment : dict The environment variables to pass to the command. suppress_output : bool If True, suppresses output from the command. trigger_error : bool If True, raises an error if the command fails. Defaults to False. timeout : float The timeout for the command. """ interactive = kwargs.pop("interactive", False) results = [] for command in args: try: if kwargs.get("container"): result = ContainerCommandExecutor(self._ctx).execute( command, **kwargs ) else: result = HostCommandExecutor(self._ctx).execute( command, interactive=interactive, **kwargs, ) results.append(result) except Exception as error: results.append( CommandResult( command, output="", exit_code=-1, duration=0.0, error=error, ) ) return results
[docs] def stream_execute( self, *args: list[str], **kwargs: Any, ) -> Iterator[str]: """Stream output from subprocesses or commands inside containers. Parameters ---------- *args : list[str] A list of arguments to pass to the subprocess or container. **kwargs : dict Keyword arguments to pass to the subprocess or container. Yields ------ str Output lines as they are produced by the command(s). """ interactive = kwargs.pop("interactive", False) for command in args: if kwargs.get("container"): yield from ContainerCommandExecutor(self._ctx).stream_execute( command, **kwargs ) else: if not isinstance(command, list): raise MinitrinoError( "Host commands must be passed as a list of arguments. " f"Got: {command!r}" ) yield from HostCommandExecutor(self._ctx).stream_execute( command, interactive=interactive, **kwargs, )
[docs] def stream_execute_with_result( self, command: list[str], **kwargs: Any, ) -> tuple[Iterator[str], threading.Event, Callable[[], CommandResult]]: """Stream output with immediate access to exit code and completion status. This method enables fast failure detection by providing both streaming output and immediate access to process/command completion status and exit code. Parameters ---------- command : list[str] The command to execute. **kwargs : dict Keyword arguments including: - container: If provided, executes in container - user: User for container execution - environment: Environment variables - suppress_output: Whether to suppress logging Returns ------- Tuple[Iterator[str], threading.Event, Callable[[], CommandResult]] A tuple containing: - Iterator[str]: Yields output lines as produced - threading.Event: Signals when command has completed - Callable[[], CommandResult]: Returns the final CommandResult Raises ------ MinitrinoError If the command is not a list for host execution. """ if kwargs.get("container"): return ContainerCommandExecutor(self._ctx).stream_execute_with_result( command, **kwargs ) else: if not isinstance(command, list): raise MinitrinoError( "Host commands must be passed as a list of arguments. " f"Got: {command!r}" ) return HostCommandExecutor(self._ctx).stream_execute_with_result( command, **kwargs )