Best Python code snippet using localstack_python
test_state.py
Source:test_state.py  
...104    def test_timeout_reconnects(self):105        messages = []106        def initialize():107            engine.initialized = True108        def collect_messages(*args):109            messages.append(args)110        engine = self.TestRedis(initialize, collect_messages, channel=str(self))111        engine.exception = redis.TimeoutError112        with self.record(pubsub_engines.logger) as records:113            engine.start()114            while not engine.ready:115                time.sleep(0.1)116        self.assertTrue(engine.initialized)117        self.assertEqual(len(records), 3)118        first_record, second_record, third_record = records119        self.assertEqual(first_record.levelno, logging.WARNING)120        self.assertEqual(first_record.msg, 'Unexpected connection timeout.')121        self.assertEqual(second_record.levelno, logging.INFO)122        self.assertEqual(second_record.msg, 'Attempting to reconnect.')123        self.assertEqual(third_record.levelno, logging.INFO)124        self.assertEqual(third_record.msg, 'Successfully reconnected.')125        engine.publish('foo', 'bar')126        while len(messages) == 0:127            time.sleep(0.1)128        self.assertEqual(messages, [('foo', 'bar')])129        engine.stop()130    def test_disconnect_reconnects(self):131        messages = []132        def initialize():133            engine.initialized = True134        def collect_messages(*args):135            messages.append(args)136        engine = self.TestRedis(initialize, collect_messages, channel=str(self))137        engine.exception = redis.ConnectionError138        with self.record(pubsub_engines.logger) as records:139            engine.start()140            while not engine.ready:141                time.sleep(0.1)142        self.assertTrue(engine.initialized)143        self.assertEqual(len(records), 3)144        first_record, second_record, third_record = records145        self.assertEqual(first_record.levelno, logging.WARNING)146        self.assertEqual(first_record.msg, 'Connection error.')147        self.assertEqual(second_record.levelno, logging.INFO)148        self.assertEqual(second_record.msg, 'Attempting to reconnect.')...data_collector.py
Source:data_collector.py  
2from slack_sdk.errors import SlackApiError3from slack_sdk.web.async_client import AsyncWebClient4from utils.abstractcontroller import AbstractController5from utils.message_filters import message_contain_russian, translate_message6async def collect_messages(channel_id: str, start_time: float, method, logger, **kwargs):7    """8        method: method to call:9            client.conversation_history10            client.conversations.replies11        kwargs: method args12    """13    cursor = None14    while True:15        payload = {"channel": channel_id, "oldest": str(start_time), "limit": 1000, **kwargs}16        if cursor:  # using pagination17            payload['cursor'] = cursor18        try:19            data = await method(**payload)20            raw_data = data.data21            if raw_data['messages']:22                yield raw_data['messages']23            cursor = raw_data['response_metadata']['next_cursor'] if raw_data['has_more'] else None24        except SlackApiError as e:25            if e.response['error'] == 'ratelimited':26                pause = float(e.response.headers['retry-after'])27                logger.info(f'Rate limit exceeded, sleep for {pause} s.')28                await asyncio.sleep(pause)29            else:30                logger.error(f'SlackApiError {e.response.status_code}:\n {e.response}')31        if not cursor:32            break  # DO-WHILE33def translate_messages(messages: list):34    for i in range(len(messages)):35        if message_contain_russian(messages[i]):36            messages[i] = translate_message(messages[i])37    return messages38class DataCollector:39    def __init__(self, controller: AbstractController):40        self.controller = controller41        self.following_channel_ids = []  # Cached42        self.collecting_running = False43        self.data_collected = False44    async def collect_messages(self, client: AsyncWebClient, logger):45        self.collecting_running = True46        self.following_channel_ids = await self.get_following_channels_ids()47        for channel_id in self.following_channel_ids:48            last_update = self.controller.get_latest_timestamp(channel_id) + 1e-6  # web_api includes oldest value49            msg_generator = collect_messages(channel_id, last_update, client.conversations_history, logger)50            async for messages in msg_generator:51                translate_messages(messages)52                self.controller.add_parent_messages(messages, channel_id)53                for message in messages:54                    if 'thread_ts' in message:55                        chd_message_generator = collect_messages(channel_id, last_update,56                                                                 client.conversations_replies, logger,57                                                                 ts=message['thread_ts'])58                        async for child_messages in chd_message_generator:59                            self.controller.add_child_messages(child_messages, channel_id, message['thread_ts'])60            logger.info(f'Channel ID: {channel_id} was scanned')61        self.data_collected = True62        self.collecting_running = False63    async def set_channels(self, checkbox_action: dict):64        channels = dict(65            (checkbox['value'], checkbox['text']['text']) for checkbox in checkbox_action['selected_options'])66        old_following = await self.get_following_channels_ids()67        for channel in channels.keys():68            if channel not in old_following:69                self.data_collected = False...eliot_demo.py
Source:eliot_demo.py  
2import requests3from eliot import start_action, add_destinations, log_call4from eliottree import render_tasks, tasks_from_iterable5messages = []6def collect_messages(message):7    messages.append(message)8add_destinations(collect_messages)9@log_call10def get_some_data(url):11    requests.get(url)12try:13    with start_action(action_type='SomeOuterAction', x=123) as action:14        action.log(message_type='my_dummy_message', text='something is about to happen')15        url = 'http://4ut23y74283ty872y3t47823t.com/'16        with start_action(action_type='SomeInnerAction', url=url):17            get_some_data(url)18except:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
