minitrino.core.exec.host module#
Executes commands on the host via subprocess.
- class minitrino.core.exec.host.HostCommandExecutor(ctx: MinitrinoContext)[source][source]#
Bases:
objectExecutes commands on the host via subprocess.
- execute(command: list[str], interactive: bool = False, **kwargs: Any) CommandResult[source][source]#
Execute a command on the host via subprocess.
- stream_execute(command: list[str], interactive: bool = False, **kwargs: Any) Iterator[str][source][source]#
Stream output lines from a subprocess.
- stream_execute_with_result(command: list[str], **kwargs: Any) tuple[Iterator[str], Event, Callable[[], CommandResult]][source][source]#
Stream output lines from a subprocess with access to exit code.
Returns a tuple of: - Iterator[str]: Yields output lines as they are produced - threading.Event: Signals when the process has completed - Callable[[], CommandResult]: Returns the final CommandResult
This method enables fast failure detection by providing both streaming output and immediate access to process completion status and exit code.
- Parameters:
command (list[str]) – The command to execute.
**kwargs (Any) – Additional keyword arguments for subprocess execution.
- Returns:
A tuple containing the output iterator, completion event, and result callable.
- Return type:
Tuple[Iterator[str], threading.Event, Callable[[], CommandResult]]