Source code for minitrino.core.exec.cmd
"""Command execution utilities for Minitrino clusters."""
from __future__ import annotations
import threading
from collections.abc import Callable, Iterator
from typing import TYPE_CHECKING, Any
from minitrino.core.errors import MinitrinoError
from minitrino.core.exec.container import ContainerCommandExecutor
from minitrino.core.exec.host import HostCommandExecutor
from minitrino.core.exec.result import CommandResult
if TYPE_CHECKING:
from minitrino.core.context import MinitrinoContext
[docs]
class CommandExecutor:
"""Execute commands in a subprocess or within a Docker container.
This is a thin dispatcher that delegates to HostCommandExecutor or
ContainerCommandExecutor based on the presence of the 'container' kwarg.
"""
def __init__(self, ctx: MinitrinoContext) -> None:
self._ctx = ctx
[docs]
def execute(
self,
*args: list[str],
**kwargs: Any,
) -> list[CommandResult]:
"""Execute commands in a subprocess or within a container.
Keyword Arguments
-----------------
interactive : bool
If True, runs the command in interactive mode.
container : MinitrinoContainer
The container to run the command in. If not provided, the
command will be run on the host via HostCommandExecutor.
user : str
The user to run the command as. If not provided, the command
will be run as root.
environment : dict
The environment variables to pass to the command.
suppress_output : bool
If True, suppresses output from the command.
trigger_error : bool
If True, raises an error if the command fails. Defaults to
False.
timeout : float
The timeout for the command.
"""
interactive = kwargs.pop("interactive", False)
results = []
for command in args:
try:
if kwargs.get("container"):
result = ContainerCommandExecutor(self._ctx).execute(
command, **kwargs
)
else:
result = HostCommandExecutor(self._ctx).execute(
command,
interactive=interactive,
**kwargs,
)
results.append(result)
except Exception as error:
results.append(
CommandResult(
command,
output="",
exit_code=-1,
duration=0.0,
error=error,
)
)
return results
[docs]
def stream_execute(
self,
*args: list[str],
**kwargs: Any,
) -> Iterator[str]:
"""Stream output from subprocesses or commands inside containers.
Parameters
----------
*args : list[str]
A list of arguments to pass to the subprocess or container.
**kwargs : dict
Keyword arguments to pass to the subprocess or container.
Yields
------
str
Output lines as they are produced by the command(s).
"""
interactive = kwargs.pop("interactive", False)
for command in args:
if kwargs.get("container"):
yield from ContainerCommandExecutor(self._ctx).stream_execute(
command, **kwargs
)
else:
if not isinstance(command, list):
raise MinitrinoError(
"Host commands must be passed as a list of arguments. "
f"Got: {command!r}"
)
yield from HostCommandExecutor(self._ctx).stream_execute(
command,
interactive=interactive,
**kwargs,
)
[docs]
def stream_execute_with_result(
self,
command: list[str],
**kwargs: Any,
) -> tuple[Iterator[str], threading.Event, Callable[[], CommandResult]]:
"""Stream output with immediate access to exit code and completion status.
This method enables fast failure detection by providing both streaming
output and immediate access to process/command completion status and
exit code.
Parameters
----------
command : list[str]
The command to execute.
**kwargs : dict
Keyword arguments including:
- container: If provided, executes in container
- user: User for container execution
- environment: Environment variables
- suppress_output: Whether to suppress logging
Returns
-------
Tuple[Iterator[str], threading.Event, Callable[[], CommandResult]]
A tuple containing:
- Iterator[str]: Yields output lines as produced
- threading.Event: Signals when command has completed
- Callable[[], CommandResult]: Returns the final CommandResult
Raises
------
MinitrinoError
If the command is not a list for host execution.
"""
if kwargs.get("container"):
return ContainerCommandExecutor(self._ctx).stream_execute_with_result(
command, **kwargs
)
else:
if not isinstance(command, list):
raise MinitrinoError(
"Host commands must be passed as a list of arguments. "
f"Got: {command!r}"
)
return HostCommandExecutor(self._ctx).stream_execute_with_result(
command, **kwargs
)