Best Python code snippet using avocado_python
test_consumer.py
Source:test_consumer.py  
...46        app.consumer._drain_messages.side_effect = asyncio.CancelledError47        await fetcher._fetcher(fetcher)48        app.consumer._drain_messages.assert_called_once_with(fetcher)49    @pytest.mark.asyncio50    async def test_on_stop__no_drainer(self, *, fetcher):51        fetcher._drainer = None52        await fetcher.on_stop()53    @pytest.mark.asyncio54    async def test_on_stop__drainer_done(self, *, fetcher):55        fetcher._drainer = Mock(done=Mock(return_value=True))56        await fetcher.on_stop()57    @pytest.mark.asyncio58    async def test_on_stop_drainer__drainer_done2(self, *, fetcher):59        fetcher._drainer = Mock(done=Mock(return_value=False))60        with patch("asyncio.wait_for", AsyncMock()) as wait_for:61            wait_for.coro.return_value = None62            await fetcher.on_stop()63        fetcher._drainer.cancel.assert_called_once_with()64        assert wait_for.call_count...request.py
Source:request.py  
...41            data = await self.read()42            if not data:43                break44            yield data45    async def _drainer(self) -> bytes:46        while True:47            data = await self.read(parse=False)48            if not data:49                break50            yield data51    def on_header(self, name: bytes, value: bytes):52        value = value.decode()53        if value:54            name = name.decode().title()55            if name in self.request.headers:56                self.request.headers[name] += ', {}'.format(value)57            else:58                self.request.headers[name] = value59    def on_body(self, data: bytes):60        self.request.body += data61    def on_message_begin(self):62        self.complete = False63        self.request = Request(self.socket, self.reader)64    def on_message_complete(self):65        self.complete = True66    def on_url(self, url: bytes):67        self.request.url = url68        parsed = parse_url(url)69        self.request.path = unquote(parsed.path.decode())70        self.request.query_string = (parsed.query or b'').decode()71    def on_headers_complete(self):72        self.request.keep_alive = self.parser.should_keep_alive()73        self.request.method = self.parser.get_method().decode().upper()74        self.headers_complete = True75    async def __aiter__(self):76        keep_alive = True77        while keep_alive:78            data = await self.read()79            if data is None:80                break81            if self.headers_complete:82                yield self.request83                keep_alive = self.request.keep_alive84                if keep_alive:85                    if not self.complete:86                        await self.reader.aclose()87                        # We drain if there's an uncomplete request.88                        async for _ in self._drainer():89                            pass90                    self.request = None91                    self.complete = False92                    self.headers_complete = False93                    self.reader = self._reader()94class Request(dict):95    __slots__ = (96        '_cookies',97        '_query',98        '_reader',99        'body',100        'files',101        'form',102        'headers',..._file.py
Source:_file.py  
1from logging.handlers import WatchedFileHandler2from api.logs._drainer import LogsDrainer3__all__ = ['FileHandler']4class FileHandler(WatchedFileHandler):5    def __init__(self, *args, **kwargs):6        super().__init__(*args, **kwargs)7        self._logs_drainer = LogsDrainer(8            path_to_logs_file='api/logs/logs.log',9            directory_to_upload='/rabbit/api/logs'10        )11    12    def handle(self, record):13        self._logs_drainer.check()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
