minitrino.core.exec.host module#

Executes commands on the host via subprocess.

class minitrino.core.exec.host.HostCommandExecutor(ctx: MinitrinoContext)[source][source]#

Bases: object

Executes commands on the host via subprocess.

execute(command: list[str], interactive: bool = False, **kwargs: Any) CommandResult[source][source]#

Execute a command on the host via subprocess.

stream_execute(command: list[str], interactive: bool = False, **kwargs: Any) Iterator[str][source][source]#

Stream output lines from a subprocess.

stream_execute_with_result(command: list[str], **kwargs: Any) tuple[Iterator[str], Event, Callable[[], CommandResult]][source][source]#

Stream output lines from a subprocess with access to exit code.

Returns a tuple of: - Iterator[str]: Yields output lines as they are produced - threading.Event: Signals when the process has completed - Callable[[], CommandResult]: Returns the final CommandResult

This method enables fast failure detection by providing both streaming output and immediate access to process completion status and exit code.

Parameters:
  • command (list[str]) – The command to execute.

  • **kwargs (Any) – Additional keyword arguments for subprocess execution.

Returns:

A tuple containing the output iterator, completion event, and result callable.

Return type:

Tuple[Iterator[str], threading.Event, Callable[[], CommandResult]]