Best Python code snippet using ATX
async_driver.py
Source:async_driver.py  
...40            extra_params=extra_params,41        )42        self.loop = loop or get_event_loop()43        self._pool_executor = ThreadPoolExecutor(max_workers=1)44    async def _run_async(self, method, *args, **kwargs):45        try:46            logger.debug("Running async method {}".format(method.__name__))47            fut = self.loop.run_in_executor(48                self._pool_executor, partial(method, *args, **kwargs)49            )50            return await fut51        except CancelledError:52            fut.cancel()53            raise54    async def get_local_storage(self):55        return await self._run_async(self._driver.get_local_storage)56    async def set_local_storage(self, data):57        return await self._run_async(self._driver.set_local_storage, data)58    async def save_firefox_profile(self, remove_old=False):59        return await self._run_async(60            self._driver.save_firefox_profile, remove_old=remove_old61        )62    async def connect(self):63        return await self._run_async(self._driver.connect)64    async def wait_for_login(self, timeout=90):65        for _ in range(timeout // 2):66            try:67                return await self._run_async(self._driver.wait_for_login, timeout=1)68            except TimeoutException:69                await sleep(1)70        raise TimeoutException("Timeout: Not logged")71    async def get_qr(self):72        return await self._run_async(self._driver.get_qr)73    async def screenshot(self, filename):74        return await self._run_async(self._driver.screenshot, filename)75    async def get_contacts(self):76        return await self._run_async(self._driver.get_contacts)77    async def get_all_chats(self):78        for chat_id in await self.get_all_chat_ids():79            yield await self.get_chat_from_id(chat_id)80    async def get_all_chat_ids(self):81        return await self._run_async(self._driver.get_all_chat_ids)82    async def get_unread(83        self, include_me=False, include_notifications=False, use_unread_count=False84    ):85        return await self._run_async(86            self._driver.get_unread,87            include_me=include_me,88            include_notifications=include_notifications,89            use_unread_count=use_unread_count,90        )91    async def get_all_messages_in_chat(92        self, chat, include_me=False, include_notifications=False93    ):94        return await self._run_async(95            self._driver.get_all_messages_in_chat,96            chat=chat,97            include_me=include_me,98            include_notifications=include_notifications,99        )100    async def get_contact_from_id(self, contact_id):101        return await self._run_async(self._driver.get_contact_from_id, contact_id)102    async def get_chat_from_id(self, chat_id):103        return await self._run_async(self._driver.get_chat_from_id, chat_id)104    async def get_chat_from_phone_number(self, number):105        return await self._run_async(self._driver.get_chat_from_phone_number, number)106    async def reload_qr(self):107        return await self._run_async(self._driver.reload_qr)108    async def get_status(self):109        return await self._run_async(self._driver.get_status)110    async def check_number_status(self, number_id):111        return await self._run_async(self._driver.check_number_status, number_id)112    async def contact_get_common_groups(self, contact_id):113        groups = await self._run_async(114            self._driver.contact_get_common_groups(contact_id), contact_id115        )116        for group in groups:117            yield group118    async def chat_send_message(self, chat_id, message):119        return await self._run_async(120            self._driver.chat_send_message, chat_id=chat_id, message=message121        )122    async def chat_get_messages(123        self, chat, include_me=False, include_notifications=False124    ):125        async for msg_id in self.get_all_message_ids_in_chat(126            chat, include_me=include_me, include_notifications=include_notifications127        ):128            yield self.get_message_by_id(msg_id)129    async def get_all_message_ids_in_chat(130        self, chat, include_me=False, include_notifications=False131    ):132        message_ids = await self._run_async(133            self._driver.get_all_message_ids_in_chat,134            chat,135            include_me,136            include_notifications,137        )138        for i in message_ids:139            yield i140    async def get_message_by_id(self, message_id):141        return await self._run_async(self._driver.get_message_by_id, message_id)142    async def chat_load_earlier_messages(self, chat_id):143        return await self._run_async(self._driver.chat_load_earlier_messages, chat_id)144    async def chat_load_all_earlier_messages(self, chat_id):145        return await self._run_async(146            self._driver.chat_load_all_earlier_messages, chat_id147        )148    async def async_chat_load_all_earlier_messages(self, chat_id):149        return await self._run_async(150            self._driver.async_chat_load_all_earlier_messages, chat_id151        )152    async def are_all_messages_loaded(self, chat_id):153        return await self._run_async(self._driver.are_all_messages_loaded, chat_id)154    async def group_get_participants_ids(self, group_id):155        return await self._run_async(self._driver.group_get_participants_ids, group_id)156    async def group_get_participants(self, group_id):157        participant_ids = await self.group_get_participants_ids(group_id)158        for participant_id in participant_ids:159            yield await self.get_contact_from_id(participant_id)160    async def group_get_admin_ids(self, group_id):161        return await self._run_async(self._driver.group_get_admin_ids, group_id)162    async def group_get_admins(self, group_id):163        admin_ids = await self.group_get_admin_ids(group_id)164        for admin_id in admin_ids:165            yield await self.get_contact_from_id(admin_id)166    async def download_file(self, url):167        async with aiohttp.ClientSession() as session:168            async with session.get(url) as resp:169                return await resp.read()170    async def download_media(self, media_msg, force_download=False):171        if not force_download:172            try:173                if media_msg.content:174                    return BytesIO(b64decode(media_msg.content))175            except AttributeError:176                pass177        file_data = await self.download_file(media_msg.client_url)178        media_key = b64decode(media_msg.media_key)179        derivative = HKDFv3().deriveSecrets(180            media_key, binascii.unhexlify(media_msg.crypt_keys[media_msg.type]), 112181        )182        parts = ByteUtil.split(derivative, 16, 32)183        iv = parts[0]184        cipher_key = parts[1]185        e_file = file_data[:-10]186        cr_obj = Cipher(187            algorithms.AES(cipher_key), modes.CBC(iv), backend=default_backend()188        )189        decryptor = cr_obj.decryptor()190        return BytesIO(decryptor.update(e_file) + decryptor.finalize())191    async def quit(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
