Best Python code snippet using avocado_python
qbit_downloader.py
Source:qbit_downloader.py  
...32        self.client = get_client()33        try:34            if ospath.exists(link):35                is_file = True36                self.ext_hash = _get_hash_file(link)37            else:38                is_file = False39                self.ext_hash = _get_hash_magnet(link)40            if is_file:41                op = self.client.torrents_add(torrent_files=[link], save_path=path)42            else:43                op = self.client.torrents_add(link, save_path=path)44            sleep(0.3)45            if op.lower() == "ok.":46                tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)47                if len(tor_info) == 0:48                    if is_file:49                        self.ext_hash = _get_hash_file(link, True)50                    while True:51                        tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)52                        if len(tor_info) > 0:53                            break54                        elif time() - self.__stalled_time >= 30:55                            ermsg = "The Torrent was not added. Report when you see this error"56                            sendMessage(ermsg, self.__listener.bot, self.__listener.message)57                            self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)58                            self.client.auth_log_out()59                            return60            else:61                sendMessage("This is an unsupported/invalid link.", self.__listener.bot, self.__listener.message)62                self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)63                self.client.auth_log_out()64                if is_file:65                    osremove(link)66                return67            if is_file:68                osremove(link)69            tor_info = tor_info[0]70            self.ext_hash = tor_info.hash71            gid = self.ext_hash[:12]72            if getDownloadByGid(gid) is not None:73                sendMessage("This Torrent is already in list.", self.__listener.bot, self.__listener.message)74                self.client.auth_log_out()75                return76            with download_dict_lock:77                download_dict[self.__listener.uid] = QbDownloadStatus(self.__listener, self)78            LOGGER.info(f"QbitDownload started: {tor_info.name} - Hash: {self.ext_hash}")79            self.periodic = setInterval(self.POLLING_INTERVAL, self.__qb_listener)80            if BASE_URL is not None and select:81                if not is_file:82                    metamsg = "Downloading Metadata, wait then you can select files or mirror torrent file"83                    meta = sendMessage(metamsg, self.__listener.bot, self.__listener.message)84                    while True:85                        tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)86                        if len(tor_info) == 0:87                            return deleteMessage(self.__listener.bot, meta)88                        try:89                            tor_info = tor_info[0]90                            if tor_info.state not in ["metaDL", "checkingResumeData", "pausedDL"]:91                                deleteMessage(self.__listener.bot, meta)92                                break93                        except:94                            return deleteMessage(self.__listener.bot, meta)95                self.client.torrents_pause(torrent_hashes=self.ext_hash)96                pincode = ""97                for n in str(self.ext_hash):98                    if n.isdigit():99                        pincode += str(n)100                    if len(pincode) == 4:101                        break102                buttons = button_build.ButtonMaker()103                if WEB_PINCODE:104                    buttons.buildbutton("Select Files", f"{BASE_URL}/app/files/{self.ext_hash}")105                    buttons.sbutton("Pincode", f"qbs pin {gid} {pincode}")106                else:107                    buttons.buildbutton("Select Files", f"{BASE_URL}/app/files/{self.ext_hash}?pin_code={pincode}")108                buttons.sbutton("Done Selecting", f"qbs done {gid} {self.ext_hash}")109                QBBUTTONS = InlineKeyboardMarkup(buttons.build_menu(2))110                msg = "Your download paused. Choose files then press Done Selecting button to start downloading."111                sendMarkup(msg, self.__listener.bot, self.__listener.message, QBBUTTONS)112            else:113                sendStatusMessage(self.__listener.message, self.__listener.bot)114        except Exception as e:115            sendMessage(str(e), self.__listener.bot, self.__listener.message)116            self.client.auth_log_out()117    def __qb_listener(self):118        try:119            tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)120            if len(tor_info) == 0:121                return122            tor_info = tor_info[0]123            if tor_info.state == "metaDL":124                self.__stalled_time = time()125                if TORRENT_TIMEOUT is not None and time() - tor_info.added_on >= TORRENT_TIMEOUT:126                    self.__onDownloadError("Dead Torrent!")127            elif tor_info.state == "downloading":128                self.__stalled_time = time()129                if not self.__dupChecked and STOP_DUPLICATE and ospath.isdir(f'{self.__path}') and not self.__listener.isLeech:130                    LOGGER.info('Checking File/Folder if already in Drive')131                    qbname = str(listdir(f'{self.__path}')[-1])132                    if qbname.endswith('.!qB'):133                        qbname = ospath.splitext(qbname)[0]134                    if self.__listener.isZip:135                        qbname = qbname + ".zip"136                    elif self.__listener.extract:137                        try:138                           qbname = get_base_name(qbname)139                        except:140                            qbname = None141                    if qbname is not None:142                        qbmsg, button = GoogleDriveHelper().drive_list(qbname, True)143                        if qbmsg:144                            self.__onDownloadError("File/Folder is already available in Drive.")145                            sendMarkup("Here are the search results:", self.__listener.bot, self.__listener.message, button)146                    self.__dupChecked = True147                if not self.__sizeChecked:148                    size = tor_info.size149                    arch = any([self.__listener.isZip, self.__listener.extract])150                    if STORAGE_THRESHOLD is not None:151                        acpt = check_storage_threshold(size, arch)152                        if not acpt:153                            msg = f'You must leave {STORAGE_THRESHOLD}GB free storage.'154                            msg += f'\nYour File/Folder size is {get_readable_file_size(size)}'155                            self.__onDownloadError(msg)156                            return157                    limit = None158                    if ZIP_UNZIP_LIMIT is not None and arch:159                        mssg = f'Zip/Unzip limit is {ZIP_UNZIP_LIMIT}GB'160                        limit = ZIP_UNZIP_LIMIT161                    elif TORRENT_DIRECT_LIMIT is not None:162                        mssg = f'Torrent limit is {TORRENT_DIRECT_LIMIT}GB'163                        limit = TORRENT_DIRECT_LIMIT164                    if limit is not None:165                        LOGGER.info('Checking File/Folder Size...')166                        if size > limit * 1024**3:167                            fmsg = f"{mssg}.\nYour File/Folder size is {get_readable_file_size(size)}"168                            self.__onDownloadError(fmsg)169                    self.__sizeChecked = True170            elif tor_info.state == "stalledDL":171                if not self.__rechecked and 0.99989999999999999 < tor_info.progress < 1:172                    msg = f"Force recheck - Name: {tor_info.name} Hash: "173                    msg += f"{self.ext_hash} Downloaded Bytes: {tor_info.downloaded} "174                    msg += f"Size: {tor_info.size} Total Size: {tor_info.total_size}"175                    LOGGER.info(msg)176                    self.client.torrents_recheck(torrent_hashes=self.ext_hash)177                    self.__rechecked = True178                elif TORRENT_TIMEOUT is not None and time() - self.__stalled_time >= TORRENT_TIMEOUT:179                    self.__onDownloadError("Dead Torrent!")180            elif tor_info.state == "missingFiles":181                self.client.torrents_recheck(torrent_hashes=self.ext_hash)182            elif tor_info.state == "error":183                self.__onDownloadError("No enough space for this torrent on device")184            elif (tor_info.state.lower().endswith("up") or tor_info.state == "uploading") and \185                 not self.__uploaded and len(listdir(self.__path)) != 0:186                self.__uploaded = True187                if not QB_SEED:188                    self.client.torrents_pause(torrent_hashes=self.ext_hash)189                if self.select:190                    clean_unwanted(self.__path)191                self.__listener.onDownloadComplete()192                if QB_SEED and not self.__listener.isLeech and not self.__listener.extract:193                    with download_dict_lock:194                        if self.__listener.uid not in list(download_dict.keys()):195                            self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)196                            self.client.auth_log_out()197                            self.periodic.cancel()198                            return199                        download_dict[self.__listener.uid] = QbDownloadStatus(self.__listener, self)200                    update_all_messages()201                    LOGGER.info(f"Seeding started: {tor_info.name}")202                else:203                    self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)204                    self.client.auth_log_out()205                    self.periodic.cancel()206            elif tor_info.state == 'pausedUP' and QB_SEED:207                self.__listener.onUploadError(f"Seeding stopped with Ratio: {round(tor_info.ratio, 3)} and Time: {get_readable_time(tor_info.seeding_time)}")208                self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)209                self.client.auth_log_out()210                self.periodic.cancel()211        except Exception as e:212            LOGGER.error(str(e))213    def __onDownloadError(self, err):214        self.client.torrents_pause(torrent_hashes=self.ext_hash)215        sleep(0.3)216        self.__listener.onDownloadError(err)217        self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)218        self.client.auth_log_out()219        self.periodic.cancel()220def get_confirm(update, context):221    query = update.callback_query222    user_id = query.from_user.id223    data = query.data224    data = data.split(" ")225    qbdl = getDownloadByGid(data[2])226    if not qbdl:227        query.answer(text="This task has been cancelled!", show_alert=True)228        query.message.delete()229    elif user_id != qbdl.listener().message.from_user.id:230        query.answer(text="This task is not for you!", show_alert=True)231    elif data[1] == "pin":232        query.answer(text=data[3], show_alert=True)233    elif data[1] == "done":234        query.answer()235        qbdl.client().torrents_resume(torrent_hashes=data[3])236        sendStatusMessage(qbdl.listener().message, qbdl.listener().bot)237        query.message.delete()238def _get_hash_magnet(mgt: str):239    if 'xt=urn:btmh:' in mgt:240        return re_search(r'(?<=xt=urn:btmh:)[a-zA-Z0-9]+', mgt).group(0)241    else:242        return re_search(r'(?<=xt=urn:btih:)[a-zA-Z0-9]+', mgt).group(0)243def _get_hash_file(path, v2=False):244    with open(path, "rb") as f:245        decodedDict = bdecode(f.read())246    if v2:247        hash_ = sha256(bencode(decodedDict[b'info'])).hexdigest()248    else:249        hash_ = sha1(bencode(decodedDict[b'info'])).hexdigest()250    return str(hash_)251qbs_handler = CallbackQueryHandler(get_confirm, pattern="qbs", run_async=True)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
