How to use _run_async method in ATX

Best Python code snippet using ATX

async_driver.py

Source:async_driver.py Github

copy

Full Screen

...40 extra_params=extra_params,41 )42 self.loop = loop or get_event_loop()43 self._pool_executor = ThreadPoolExecutor(max_workers=1)44 async def _run_async(self, method, *args, **kwargs):45 try:46 logger.debug("Running async method {}".format(method.__name__))47 fut = self.loop.run_in_executor(48 self._pool_executor, partial(method, *args, **kwargs)49 )50 return await fut51 except CancelledError:52 fut.cancel()53 raise54 async def get_local_storage(self):55 return await self._run_async(self._driver.get_local_storage)56 async def set_local_storage(self, data):57 return await self._run_async(self._driver.set_local_storage, data)58 async def save_firefox_profile(self, remove_old=False):59 return await self._run_async(60 self._driver.save_firefox_profile, remove_old=remove_old61 )62 async def connect(self):63 return await self._run_async(self._driver.connect)64 async def wait_for_login(self, timeout=90):65 for _ in range(timeout // 2):66 try:67 return await self._run_async(self._driver.wait_for_login, timeout=1)68 except TimeoutException:69 await sleep(1)70 raise TimeoutException("Timeout: Not logged")71 async def get_qr(self):72 return await self._run_async(self._driver.get_qr)73 async def screenshot(self, filename):74 return await self._run_async(self._driver.screenshot, filename)75 async def get_contacts(self):76 return await self._run_async(self._driver.get_contacts)77 async def get_all_chats(self):78 for chat_id in await self.get_all_chat_ids():79 yield await self.get_chat_from_id(chat_id)80 async def get_all_chat_ids(self):81 return await self._run_async(self._driver.get_all_chat_ids)82 async def get_unread(83 self, include_me=False, include_notifications=False, use_unread_count=False84 ):85 return await self._run_async(86 self._driver.get_unread,87 include_me=include_me,88 include_notifications=include_notifications,89 use_unread_count=use_unread_count,90 )91 async def get_all_messages_in_chat(92 self, chat, include_me=False, include_notifications=False93 ):94 return await self._run_async(95 self._driver.get_all_messages_in_chat,96 chat=chat,97 include_me=include_me,98 include_notifications=include_notifications,99 )100 async def get_contact_from_id(self, contact_id):101 return await self._run_async(self._driver.get_contact_from_id, contact_id)102 async def get_chat_from_id(self, chat_id):103 return await self._run_async(self._driver.get_chat_from_id, chat_id)104 async def get_chat_from_phone_number(self, number):105 return await self._run_async(self._driver.get_chat_from_phone_number, number)106 async def reload_qr(self):107 return await self._run_async(self._driver.reload_qr)108 async def get_status(self):109 return await self._run_async(self._driver.get_status)110 async def check_number_status(self, number_id):111 return await self._run_async(self._driver.check_number_status, number_id)112 async def contact_get_common_groups(self, contact_id):113 groups = await self._run_async(114 self._driver.contact_get_common_groups(contact_id), contact_id115 )116 for group in groups:117 yield group118 async def chat_send_message(self, chat_id, message):119 return await self._run_async(120 self._driver.chat_send_message, chat_id=chat_id, message=message121 )122 async def chat_get_messages(123 self, chat, include_me=False, include_notifications=False124 ):125 async for msg_id in self.get_all_message_ids_in_chat(126 chat, include_me=include_me, include_notifications=include_notifications127 ):128 yield self.get_message_by_id(msg_id)129 async def get_all_message_ids_in_chat(130 self, chat, include_me=False, include_notifications=False131 ):132 message_ids = await self._run_async(133 self._driver.get_all_message_ids_in_chat,134 chat,135 include_me,136 include_notifications,137 )138 for i in message_ids:139 yield i140 async def get_message_by_id(self, message_id):141 return await self._run_async(self._driver.get_message_by_id, message_id)142 async def chat_load_earlier_messages(self, chat_id):143 return await self._run_async(self._driver.chat_load_earlier_messages, chat_id)144 async def chat_load_all_earlier_messages(self, chat_id):145 return await self._run_async(146 self._driver.chat_load_all_earlier_messages, chat_id147 )148 async def async_chat_load_all_earlier_messages(self, chat_id):149 return await self._run_async(150 self._driver.async_chat_load_all_earlier_messages, chat_id151 )152 async def are_all_messages_loaded(self, chat_id):153 return await self._run_async(self._driver.are_all_messages_loaded, chat_id)154 async def group_get_participants_ids(self, group_id):155 return await self._run_async(self._driver.group_get_participants_ids, group_id)156 async def group_get_participants(self, group_id):157 participant_ids = await self.group_get_participants_ids(group_id)158 for participant_id in participant_ids:159 yield await self.get_contact_from_id(participant_id)160 async def group_get_admin_ids(self, group_id):161 return await self._run_async(self._driver.group_get_admin_ids, group_id)162 async def group_get_admins(self, group_id):163 admin_ids = await self.group_get_admin_ids(group_id)164 for admin_id in admin_ids:165 yield await self.get_contact_from_id(admin_id)166 async def download_file(self, url):167 async with aiohttp.ClientSession() as session:168 async with session.get(url) as resp:169 return await resp.read()170 async def download_media(self, media_msg, force_download=False):171 if not force_download:172 try:173 if media_msg.content:174 return BytesIO(b64decode(media_msg.content))175 except AttributeError:176 pass177 file_data = await self.download_file(media_msg.client_url)178 media_key = b64decode(media_msg.media_key)179 derivative = HKDFv3().deriveSecrets(180 media_key, binascii.unhexlify(media_msg.crypt_keys[media_msg.type]), 112181 )182 parts = ByteUtil.split(derivative, 16, 32)183 iv = parts[0]184 cipher_key = parts[1]185 e_file = file_data[:-10]186 cr_obj = Cipher(187 algorithms.AES(cipher_key), modes.CBC(iv), backend=default_backend()188 )189 decryptor = cr_obj.decryptor()190 return BytesIO(decryptor.update(e_file) + decryptor.finalize())191 async def quit(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful