Source code for minitrino.core.logging.spinner

"""Spinner logging utility."""

from __future__ import annotations

import itertools
import logging
import sys
import threading
import time
from collections.abc import Callable
from contextlib import contextmanager
from typing import TYPE_CHECKING

from minitrino.core.logging.levels import LogLevel
from minitrino.shutdown import shutdown_event

if TYPE_CHECKING:
    from minitrino.core.logging.logger import MinitrinoLogger


[docs] class LogBuffer: """Helper class for buffering and replaying log messages. Buffers messages as (msg, is_spinner_artifact, stream) tuples and can flush them to the appropriate output stream. """ def __init__(self) -> None: self.buffer: list[tuple[str, bool, str]] = []
[docs] def append(self, msg: str, is_spinner_artifact: bool, stream: str) -> None: """Append a log message to the buffer.""" self.buffer.append((msg, is_spinner_artifact, stream))
[docs] def flush(self) -> None: """Flush the buffer to the appropriate output stream.""" for msg, is_spinner_artifact, stream in self.buffer: if not is_spinner_artifact: print(msg, file=(sys.stderr if stream == "stderr" else sys.stdout)) self.buffer.clear()
class _SpinnerThread(threading.Thread): """Helper thread for displaying spinner animation. Parameters ---------- prefix : str Prefix to display before spinner. message : str Message to display alongside spinner. output_lock : threading.RLock Lock to synchronize output. spinner_done : threading.Event Event to signal spinner completion. log_buffer : LogBuffer | None Optional buffer for spinner artifacts. """ def __init__( self, prefix: str, message: str, output_lock: threading.RLock, spinner_done: threading.Event, log_buffer: LogBuffer | None = None, ) -> None: super().__init__(daemon=True) self.prefix = prefix self.message = message self.output_lock = output_lock self.spinner_done = spinner_done self.log_buffer = log_buffer def run(self) -> None: """Run the spinner thread.""" for c in itertools.cycle(r"\|/-"): if self.spinner_done.is_set() or shutdown_event.is_set(): if shutdown_event.is_set(): self.spinner_done.set() break acquired = self.output_lock.acquire(blocking=False) if acquired: try: spinner_msg = f"\r{self.prefix}{self.message} {c}" sys.stdout.write(spinner_msg) sys.stdout.flush() if self.log_buffer is not None: self.log_buffer.append(spinner_msg, True, "stdout") finally: self.output_lock.release() time.sleep(0.1)
[docs] class Spinner: """Spinner logging utility. Displays a spinner while a task is in progress. The spinner only appears if: - stdout is a TTY - log level is not DEBUG Parameters ---------- logger : MinitrinoLogger The logger instance to use for spinner state. log_sink : Callable[[str, str, bool], None] | None The log sink to use for spinner state. always_verbose : bool If True, disables spinner and always streams logs directly. """ def __init__( self, logger: MinitrinoLogger, log_sink: Callable[[str, str, bool], None] | None = None, always_verbose: bool = False, ) -> None: self.logger: MinitrinoLogger = logger self.log_sink: Callable[[str, str, bool], None] | None = log_sink self.spinner_active = threading.local() self.spinner_active.value = False self.output_lock = threading.RLock() self._spinner_thread: threading.Thread | None = None self.always_verbose = always_verbose
[docs] @contextmanager def spinner(self, message: str = ""): """Display a spinner while a task is in progress. If not a TTY or logging to a file, disables spinner and buffering. Parameters ---------- message : str Message to display alongside the spinner. """ if self.always_verbose or self.logger._log_level == LogLevel.DEBUG: yield return if not sys.stdout.isatty(): yield return if any(isinstance(h, logging.FileHandler) for h in self.logger.handlers): yield return log_buffer = LogBuffer() try: self._set_spinner_active(True) spinner_done = self._start_spinner(message, log_buffer) try: yield finally: self._set_spinner_active(False) self._stop_spinner(spinner_done) finally: self._clear_spinner_line() sys.stdout.flush() log_buffer.flush()
def _start_spinner( self, message: str = "", log_buffer: LogBuffer | None = None, ) -> threading.Event: """Start the spinner animation.""" spinner_done = threading.Event() prefix = self.logger.styled_prefix() self._spinner_thread = _SpinnerThread( prefix=prefix, message=message, output_lock=self.output_lock, spinner_done=spinner_done, log_buffer=log_buffer, ) self._spinner_thread.start() return spinner_done def _stop_spinner(self, spinner_done: threading.Event, delay: float = 0.1) -> None: """Stop the spinner and clear the terminal line. Parameters ---------- spinner_done : threading.Event The event returned by _start_spinner. delay : float Time to wait for the spinner thread to exit. """ spinner_done.set() if self._spinner_thread is not None: self._spinner_thread.join(timeout=delay) with self.output_lock: sys.stdout.write("\033[2K\r") # Only clear, no newline sys.stdout.flush() def _set_spinner_active(self, active: bool): self.spinner_active.value = active def _is_spinner_active(self) -> bool: return getattr(self.spinner_active, "value", False) def _clear_spinner_line(self): # Used by SpinnerAwareHandler; always clear if TTY if sys.stdout.isatty(): with self.output_lock: sys.stdout.write("\033[2K\r") sys.stdout.flush()