Source code for minitrino.core.logging.sink

"""Logging sink for Minitrino logger."""

import logging

from minitrino.ansi import strip_ansi


[docs] class SinkCollector: """Collect log messages and metadata from the log sink.""" MAX_BUFFER_BYTES = 100 * 1024 * 1024 # 100 MB def __init__(self) -> None: self.buffer: list[tuple[str, str, bool]] = [] self._buffer_size_bytes = 0 def __call__(self, msg: str, stream: str, is_spinner: bool): """Collect a log message and add it to the buffer.""" msg_size = len(msg.encode("utf-8")) + len(stream.encode("utf-8")) + 1 self._buffer_size_bytes += msg_size self.buffer.append((msg, stream, is_spinner)) if self._buffer_size_bytes > self.MAX_BUFFER_BYTES: self._trim_buffer() def _trim_buffer(self): """Trim the buffer by discarding the oldest 50% of entries.""" half = len(self.buffer) // 2 self.buffer = self.buffer[half:] self._buffer_size_bytes = sum( len(msg.encode("utf-8")) + len(stream.encode("utf-8")) + 1 for msg, stream, _ in self.buffer )
[docs] def clear(self): """Clear the buffer.""" self.buffer.clear() self._buffer_size_bytes = 0
@property def size(self) -> int: """Return the size of the buffer in bytes.""" return self._buffer_size_bytes
[docs] class SinkOnlyHandler(logging.Handler): """Handler that sends all logs to the sink, regardless of level.""" def __init__(self, sink: SinkCollector, formatter: logging.Formatter) -> None: super().__init__(level=logging.NOTSET) self.sink = sink self.setFormatter(formatter)
[docs] def emit(self, record: logging.LogRecord) -> None: """Emit a log record.""" try: msg = self.format(record) msg = strip_ansi(msg) stream = "stderr" if record.levelno >= logging.ERROR else "stdout" self.sink(msg, stream, False) except Exception: self.handleError(record)