Best Python code snippet using yandex-tank
ServerProvide.py
Source:ServerProvide.py  
...27        if hasattr(self, cmd):28            func = getattr(self, cmd)29            func(self, datas[1:])30        else:31            self.__send_status(CMD_NOT_EXIST)32    def cd(self, *args):33        print(args)34    def download(self, *args):35        print(args, 'download')36        filename = args[1][0]37        data = self.request.recv(1024)38        data = data.decode("utf8")39        d = json.loads(data)40        filesize = d['filesize']41        if filename not in os.listdir():42            self.__send_status(DOWNLOAD_FILE_NOT_EXIST)43            return44        else:45            if filesize != 0:46                # æç¹ç»ä¼ 47                self.__send_status(DOWNLOAD_FILE_TRUNCATE)48                size = os.path.getsize(filename)49                l = struct.pack('i', size)50                self.request.sendall(l)51                f = open(filename, 'rb')52                f.seek(filesize)53                size -= filesize54                p = ProcessBar.ProcessBar(size)55                now_size = 056                while size > now_size:57                    self.request.sendall(f.read(1024))58                    now_size += 102459                    p.show_process(now_size)60                f.close()61            else:62                self.__send_status(DOWNLOAD_READY)63                size = os.path.getsize(filename)64                l = struct.pack('i', size)65                self.request.sendall(l)66                f = open(filename, 'rb')67                p = ProcessBar.ProcessBar(size)68                now_size = 069                while size > now_size:70                    self.request.sendall(f.read(1024))71                    now_size += 102472                    p.show_process(now_size)73                f.close()74    def upload(self, *args):75        print(args, 'upload')76        filename = args[1][0]77        data = self.request.recv(1024)78        data = data.decode("utf8")79        d = json.loads(data)80        filesize = d['filesize']81        if self.home_size < filesize:82            self.__send_status(UPLOAD_OVER_HOME_SIZE)83            return84        self.home_size -= filesize85        if filename in os.listdir():86            # æç¹ä¸ä¼ 87            self.__send_status(UPLOAD_FILE_TRUNCATE)88            size = os.path.getsize(filename)89            s = struct.pack('i', size)90            self.request.sendall(s)91            f = open(filename, 'ab')92            filesize -= size93            p = ProcessBar.ProcessBar(filesize)94            now_size = 095            while filesize > now_size:96                data = self.request.recv(1024)97                f.write(data)98                now_size += len(data)99                p.show_process(now_size)100            f.close()101        else:102            self.__send_status(UPLOAD_READY)103            f = open(filename, 'wb')104            p = ProcessBar.ProcessBar(filesize)105            now_size = 0106            while filesize > now_size:107                data = self.request.recv(1024)108                f.write(data)109                now_size += len(data)110                p.show_process(now_size)111            f.close()112    def ls(self, *args):113        if len(args[1]) > 1:114            self.__send_status(CMD_NOT_EXIST)115            return116        s = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE)117        self.__send_status(CMD_SUCCESS)118        sdata = s.stdout.read()119        if sdata == b'':120            sdata = b' '121        self.request.sendall(sdata)122    def auth(self):123        data = self.request.recv(1024)124        data = data.decode('utf8')125        d = json.loads(data)126        c = configparser.ConfigParser()127        user_db = os.path.join(BASE_DIR, 'db', 'user.cfg')128        c.read(user_db)129        if c.has_section(d['username']):130            if c.get(d['username'], 'password') == d['password']:131                self.__send_status(NORMAL_STATUS)132                print('éªè¯æå')133                self.username = d['username']134                self.home_size = HOME_MAX_SIZE135                self.home_dir = os.path.join(BASE_DIR, d['username'])136                return True137            else:138                self.__send_status(AUTH_PASSWORD_NO_CORRECT)139                print('å¯ç ä¸å¯¹')140        else:141            self.__send_status(AUTH_USER_NO_EXIST)142            print('ä¸åå¨ç¨æ·å')143    def init_home(self):144        if not os.path.exists(self.home_dir):145            os.mkdir(self.home_dir)146        os.chdir(self.home_dir)147    def __send_status(self, status):148        data = struct.pack('b', status)149        self.request.sendall(data)150        if status in [AUTH_PASSWORD_NO_CORRECT, AUTH_USER_NO_EXIST]:...bridge.py
Source:bridge.py  
...63                                '   unlisted\n'64                                '   public\n')65        if self.mastodon_character_limit <= 0:66            raise MastodonError('Mastodon character limit cannot be 0 or less.')67    def __send_status(self, text: List[str], media: Optional[str | bytes] = None, mime: Optional[str] = None) -> None:68        media_id = self.mastodon.media_post(69            media,70            mime_type=mime if mime else None71        ) if media else None72        recent_post = self.mastodon.status_post(73            status=text[0],74            media_ids=media_id,75            visibility=self.mastodon_visibility76        )77        for t in text[1:]:78            time.sleep(1)79            current_post = self.mastodon.status_post(80                status=t,81                visibility=self.mastodon_visibility,82                in_reply_to_id=recent_post.get('id'), # type: ignore83            )84            recent_post = current_post85    def __prepare_media(self, fileID: str, caption: List[str]) -> None:86        info: telebot.types.File = self.telegram.get_file(fileID)87        mime, _ = mimetypes.guess_type(info.file_path)88        if not mime:89            logger.error(f"No mime type can be guessed for {info.file_id} ({info.file_path})")90            logger.info(f"Trying to save file into /tmp/{info.file_path}")91            path: str = ''.join(["/tmp/", info.file_path])92            file: bytes = self.telegram.download_file(info.file_path)93            with open(path, "wb") as f:94                f.write(file)95            self.__send_status(caption, media=path, mime=None)96        else:97            file: bytes = self.telegram.download_file(info.file_path)98            self.__send_status(caption, media=file, mime=mime)99    def channel_post_photo(self, message: telebot.types.Message) -> None:100        '''101        NOTE: telebot passes only one image at a time.102            There might be a way to gather all images in one post103            and send them immediately.104            But i'm at a loss here105        '''106        if not message.photo: return107        caption = self.footer_helper.caption(message, self.mastodon_character_limit)108        fileID = message.photo[-1].file_id109        self.__prepare_media(fileID, caption)110    def channel_post_video(self, message: telebot.types.Message) -> None:111        if not message.video: return112        caption = self.footer_helper.caption(message, self.mastodon_character_limit)113        fileID = message.video.file_id114        self.__prepare_media(fileID, caption)115    def channel_post_text(self, message: telebot.types.Message) -> None:116        status_text = self.footer_helper.text(message, self.mastodon_character_limit)...StatusManager.py
Source:StatusManager.py  
...21        self.loop.call_soon(self.__send_status)22        self.loop.run_forever()23    def open_media_player(self):24        self.media_player = True25    def __send_status(self):26        self.bus.send(can.Message(arbitration_id=0x3E7, data=self.__create_status_frame(), extended_id=False))27        self.loop.call_later(1, self.__send_status)28    def __create_status_frame(self):29        data = self.__get_network_name_bytes()30        data += self.__get_status_byte()31        data += self.__get_media_player_byte()32        return data33    def __get_network_name_bytes(self):34        # if self.network_name is None:35        return bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00])36        # else:37        #     text = self.network_name[0 + 6]38        #     frames = TextEncoder.encode(text)[0]39        #     for i in range(0, 7):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
