Best Python code snippet using avocado_python
output_logger.py
Source:output_logger.py  
1import asyncio2import logging3import os4import sys5from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress6import click7from async_generator import asynccontextmanager8from .utils import capture_subprocess_output, remove_ansi_escape_sequences9class OutputLogger:10    def __init__(self, file):11        self.file = file12        self.stdout = sys.stdout13        self.stderr = sys.stderr14        self.outs = {}15        self._max_name_length = None16    def out(self, name, stream=None, color=None):17        if stream in (None, sys.stderr):18            stream = self.stderr19        elif stream is sys.stdout:20            stream = self.stdout21        out = Out(self, name, file=self.file, stream=stream, color=color)22        self.outs[name] = out23        self._max_name_length = None24        return out25    @property26    def max_name_length(self):27        if self._max_name_length is None:28            name_lengths = [len(name) for name in self.outs.keys()]29            self._max_name_length = max(name_lengths, default=0)30        return self._max_name_length31class LineWriter:32    NEWLINE = "\n"33    def __init__(self, out):34        self.__out = out35        self.__skip_next_newline = False36    def __getattr__(self, name):37        return getattr(self.__out, name)38    def write(self, line):39        # Pre 3.7, `logging.StreamHandler.emit` will40        # write the message and terminator separately41        if self.__skip_next_newline:42            self.__skip_next_newline = False43            if line == self.NEWLINE:44                return45        if not line.endswith(self.NEWLINE):46            line = line + self.NEWLINE47            self.__skip_next_newline = True48        self.__out.writeline(line)49class FileDescriptorWriter:50    def __init__(self, out, fd):51        self.__out = out52        self.__writer = os.fdopen(fd, "w")53    def __getattr__(self, name):54        return getattr(self.__writer, name)55    def isatty(self):56        return self.__out.isatty()57class Out:58    """59    Simple Out class to log anything written in a stream to a file60     and then also write it to the stream.61    """62    def __init__(self, output_logger, name, file, stream, color="white"):63        self.output_logger = output_logger64        self.name = name65        self.color = color66        self.file = file67        self.stream = stream68        self.last_line = ""69        self._prefix = None70        self._max_name_length = None71    @property72    def prefix(self):73        max_name_length = self.output_logger.max_name_length74        if self._prefix is None or self._max_name_length != max_name_length:75            self._max_name_length = max_name_length76            padding = max(max_name_length, 6)77            padded_name = self.name.ljust(padding)78            self._prefix = click.style(f"{padded_name} | ", fg=self.color)79        return self._prefix80    @contextmanager81    def line_writer(self):82        yield LineWriter(self)83    @contextmanager84    def redirect_logging(self, format=None, ignore_errors=()):85        logger = logging.getLogger()86        original_log_handlers = logger.handlers87        line_writer = LineWriter(self)88        handler = logging.StreamHandler(line_writer)89        if not format:90            if logger.getEffectiveLevel() == logging.DEBUG:91                format = "%(levelname)s %(message)s"92            else:93                format = "%(message)s"94        formatter = logging.Formatter(fmt=format)95        handler.setFormatter(formatter)96        logger.handlers = [handler]97        try:98            yield99        except (KeyboardInterrupt, asyncio.CancelledError, *ignore_errors):100            raise101        except Exception as err:102            logger.error(str(err), exc_info=True)103            raise104        finally:105            logger.handlers = original_log_handlers106    @asynccontextmanager107    async def writer(self):108        read_fd, write_fd = os.pipe()109        reader = asyncio.ensure_future(self._read_from_fd(read_fd))110        writer = FileDescriptorWriter(self, write_fd)111        try:112            yield writer113        finally:114            writer.close()115            with suppress(asyncio.CancelledError):116                await reader117    @asynccontextmanager118    async def redirect_stdout(self):119        async with self.writer() as stdout:120            with redirect_stdout(stdout):121                yield122    @asynccontextmanager123    async def redirect_stderr(self):124        async with self.writer() as stderr:125            with redirect_stderr(stderr):126                yield127    def writeline(self, line):128        self.last_line = line129        click.echo(self.prefix + line, nl=False, file=self)130        self.flush()131    def write(self, data):132        self.stream.write(data)133        self.file.write(remove_ansi_escape_sequences(data))134        # always flush the file to keep it up to date with the stream135        self.file.flush()136    def flush(self):137        self.stream.flush()138        self.file.flush()139    def isatty(self):140        # Explicitly claim we're connected to a TTY to stop Click141        # from stripping ANSI codes142        return self.stream.isatty()143    async def _read_from_fd(self, read_fd):144        # Since we're redirecting our own stdout and stderr output,145        # the line length limit can be arbitrarily large.146        line_length_limit = 1024 * 1024 * 1024  # 1 GiB147        reader = asyncio.StreamReader(limit=line_length_limit)148        read_protocol = asyncio.StreamReaderProtocol(reader)149        loop = asyncio.get_event_loop()150        read_transport, _ = await loop.connect_read_pipe(151            lambda: read_protocol, os.fdopen(read_fd)152        )...util.py
Source:util.py  
1"""2General utility non-I/O code for gpu_specter.3"""4import time, datetime5import os, logging6import numpy as np7try:8    import cupy as cp9except ImportError:10    pass11def gather_ndarray(sendbuf, comm, root=0):12    """Gather multidimensional ndarray objects to one process from all other processes in a group.13    Args:14        sendbuf: multidimensional ndarray15        comm: mpi communicator16        root: rank of receiving process17    Returns:18        recvbuf: A stacked multidemsional ndarray if comm.rank == root, otherwise None.19    """20    rank = comm.rank21    # Save shape and flatten input array22    sendbuf = np.array(sendbuf)23    shape = sendbuf.shape24    sendbuf = sendbuf.ravel()25    # Collect local array sizes using the high-level mpi4py gather26    sendcounts = np.array(comm.gather(len(sendbuf), root))27    if rank == root:28        recvbuf = np.empty(sum(sendcounts), dtype=sendbuf.dtype)29    else:30        recvbuf = None31    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendcounts), root=root)32    if rank == root:33        # Reshape output before returning34        recvbuf = recvbuf.reshape((-1,) + shape[1:])35    return recvbuf36def get_array_module(x):37    """Returns the array module for arguments.38    This function is used to implement CPU/GPU generic code. If the argument39    is a :class:`cupy.ndarray` object, the :mod:`cupy` module is returned.40    For more details see: https://docs-cupy.chainer.org/en/stable/reference/generated/cupy.get_array_module.html41    Args:42        args: array to determine whether NumPy or CuPy should be used.43    Returns:44        module: :mod:`cupy` or :mod:`numpy` is returned based on the types of45        the arguments.46    """47    try:48        return cp.get_array_module(x)49    except NameError:50        # If the cupy module is unavailble, default to numpy51        return np52#- subset of desiutil.log.get_logger, to avoid desiutil dependency53_loggers = dict()54def get_logger(level=None):55    if level is None:56        level = os.getenv('DESI_LOGLEVEL', 'INFO')57    level = level.upper()58    if level == 'DEBUG':59        loglevel = logging.DEBUG60    elif level == 'INFO':61        loglevel = logging.INFO62    elif level == 'WARN' or level == 'WARNING':63        loglevel = logging.WARNING64    elif level == 'ERROR':65        loglevel = logging.ERROR66    elif level == 'FATAL' or level == 'CRITICAL':67        loglevel = logging.CRITICAL68    else:69        raise ValueError('Unknown log level {}; should be DEBUG/INFO/WARNING/ERROR/CRITICAL'.format(level))70    if level not in _loggers:71        logger = logging.getLogger('desimeter.'+level)72        logger.setLevel(loglevel)73        #- handler and formatter code adapted from74        #- https://docs.python.org/3/howto/logging.html#configuring-logging75        # create console handler and set level to debug76        ch = logging.StreamHandler()77        ch.setLevel(loglevel)78        # create formatter79        formatter = logging.Formatter('%(levelname)s:%(filename)s:%(lineno)s:%(funcName)s:%(message)s')80        # add formatter to ch81        ch.setFormatter(formatter)82        # add ch to logger83        logger.addHandler(ch)84        _loggers[level] = logger85    return _loggers[level]86class Timer(object):87    def __init__(self):88        """A helper class for capturing timing splits.89        The start time is set on instantiation.90        """91        self.start = self.time()92        self.splits = list()93        self._max_name_length = 594    def split(self, name):95        """Capture timing split since start or previous split.96        Args:97            name: name to use for the captured interval98        """99        split = (name, self.time())100        self._max_name_length = max(len(name), self._max_name_length)101        self.splits.append(split)102    def time(self):103        """Returns the number of seconds since start of unix epoch.104        """105        return time.time()106    def _gen_split_summary(self):107        """Split summary generator.108        """109        start_iso = datetime.datetime.utcfromtimestamp(self.start).isoformat()110        yield '{name:>{n}s}:{time}'.format(name='start', time=start_iso, n=self._max_name_length)111        last = self.start112        fmt = '{name:>{n}s}:{delta:>22.2f}'113        for name, time in self.splits:114            delta = time - last115            yield fmt.format(name=name, delta=delta, n=self._max_name_length)116            last = time117        yield fmt.format(name='total', delta=last-self.start, n=self._max_name_length)118    def log_splits(self, log):119        """Logs the timer's split summary as INFO120        Args:121            log: a logger object122        """123        for line in self._gen_split_summary():124            log.info(line)125    def print_splits(self):126        """Prints the timer's split summary127        """128        for line in self._gen_split_summary():...test.py
Source:test.py  
1class Person:2    _min_length = 23    _max_name_length = 154    def __init__(self, name, age):5        self.name = name6        self.age = age7    @property8    def name(self):9        return self.__name10    @name.setter11    def name(self, value):12        self._validate_name(value)13        self.__name = value14    @classmethod15    def _validate_name(cls, value):16        value_len = len(value)17        if value_len < cls._min_length or cls._max_name_length < value_len:18            raise ValueError(f"The length of the name must be between {cls._min_length} and {cls._max_name_length}")19class Student(Person):20    attr1 = "Student"21    _min_length = 1022    # _max_name_length = 19...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
