How to use __send_status method in yandex-tank

Best Python code snippet using yandex-tank

ServerProvide.py

Source:ServerProvide.py Github

copy

Full Screen

...27 if hasattr(self, cmd):28 func = getattr(self, cmd)29 func(self, datas[1:])30 else:31 self.__send_status(CMD_NOT_EXIST)32 def cd(self, *args):33 print(args)34 def download(self, *args):35 print(args, 'download')36 filename = args[1][0]37 data = self.request.recv(1024)38 data = data.decode("utf8")39 d = json.loads(data)40 filesize = d['filesize']41 if filename not in os.listdir():42 self.__send_status(DOWNLOAD_FILE_NOT_EXIST)43 return44 else:45 if filesize != 0:46 # 断点续传47 self.__send_status(DOWNLOAD_FILE_TRUNCATE)48 size = os.path.getsize(filename)49 l = struct.pack('i', size)50 self.request.sendall(l)51 f = open(filename, 'rb')52 f.seek(filesize)53 size -= filesize54 p = ProcessBar.ProcessBar(size)55 now_size = 056 while size > now_size:57 self.request.sendall(f.read(1024))58 now_size += 102459 p.show_process(now_size)60 f.close()61 else:62 self.__send_status(DOWNLOAD_READY)63 size = os.path.getsize(filename)64 l = struct.pack('i', size)65 self.request.sendall(l)66 f = open(filename, 'rb')67 p = ProcessBar.ProcessBar(size)68 now_size = 069 while size > now_size:70 self.request.sendall(f.read(1024))71 now_size += 102472 p.show_process(now_size)73 f.close()74 def upload(self, *args):75 print(args, 'upload')76 filename = args[1][0]77 data = self.request.recv(1024)78 data = data.decode("utf8")79 d = json.loads(data)80 filesize = d['filesize']81 if self.home_size < filesize:82 self.__send_status(UPLOAD_OVER_HOME_SIZE)83 return84 self.home_size -= filesize85 if filename in os.listdir():86 # 断点上传87 self.__send_status(UPLOAD_FILE_TRUNCATE)88 size = os.path.getsize(filename)89 s = struct.pack('i', size)90 self.request.sendall(s)91 f = open(filename, 'ab')92 filesize -= size93 p = ProcessBar.ProcessBar(filesize)94 now_size = 095 while filesize > now_size:96 data = self.request.recv(1024)97 f.write(data)98 now_size += len(data)99 p.show_process(now_size)100 f.close()101 else:102 self.__send_status(UPLOAD_READY)103 f = open(filename, 'wb')104 p = ProcessBar.ProcessBar(filesize)105 now_size = 0106 while filesize > now_size:107 data = self.request.recv(1024)108 f.write(data)109 now_size += len(data)110 p.show_process(now_size)111 f.close()112 def ls(self, *args):113 if len(args[1]) > 1:114 self.__send_status(CMD_NOT_EXIST)115 return116 s = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE)117 self.__send_status(CMD_SUCCESS)118 sdata = s.stdout.read()119 if sdata == b'':120 sdata = b' '121 self.request.sendall(sdata)122 def auth(self):123 data = self.request.recv(1024)124 data = data.decode('utf8')125 d = json.loads(data)126 c = configparser.ConfigParser()127 user_db = os.path.join(BASE_DIR, 'db', 'user.cfg')128 c.read(user_db)129 if c.has_section(d['username']):130 if c.get(d['username'], 'password') == d['password']:131 self.__send_status(NORMAL_STATUS)132 print('验证成功')133 self.username = d['username']134 self.home_size = HOME_MAX_SIZE135 self.home_dir = os.path.join(BASE_DIR, d['username'])136 return True137 else:138 self.__send_status(AUTH_PASSWORD_NO_CORRECT)139 print('密码不对')140 else:141 self.__send_status(AUTH_USER_NO_EXIST)142 print('不存在用户名')143 def init_home(self):144 if not os.path.exists(self.home_dir):145 os.mkdir(self.home_dir)146 os.chdir(self.home_dir)147 def __send_status(self, status):148 data = struct.pack('b', status)149 self.request.sendall(data)150 if status in [AUTH_PASSWORD_NO_CORRECT, AUTH_USER_NO_EXIST]:...

Full Screen

Full Screen

bridge.py

Source:bridge.py Github

copy

Full Screen

...63 ' unlisted\n'64 ' public\n')65 if self.mastodon_character_limit <= 0:66 raise MastodonError('Mastodon character limit cannot be 0 or less.')67 def __send_status(self, text: List[str], media: Optional[str | bytes] = None, mime: Optional[str] = None) -> None:68 media_id = self.mastodon.media_post(69 media,70 mime_type=mime if mime else None71 ) if media else None72 recent_post = self.mastodon.status_post(73 status=text[0],74 media_ids=media_id,75 visibility=self.mastodon_visibility76 )77 for t in text[1:]:78 time.sleep(1)79 current_post = self.mastodon.status_post(80 status=t,81 visibility=self.mastodon_visibility,82 in_reply_to_id=recent_post.get('id'), # type: ignore83 )84 recent_post = current_post85 def __prepare_media(self, fileID: str, caption: List[str]) -> None:86 info: telebot.types.File = self.telegram.get_file(fileID)87 mime, _ = mimetypes.guess_type(info.file_path)88 if not mime:89 logger.error(f"No mime type can be guessed for {info.file_id} ({info.file_path})")90 logger.info(f"Trying to save file into /tmp/{info.file_path}")91 path: str = ''.join(["/tmp/", info.file_path])92 file: bytes = self.telegram.download_file(info.file_path)93 with open(path, "wb") as f:94 f.write(file)95 self.__send_status(caption, media=path, mime=None)96 else:97 file: bytes = self.telegram.download_file(info.file_path)98 self.__send_status(caption, media=file, mime=mime)99 def channel_post_photo(self, message: telebot.types.Message) -> None:100 '''101 NOTE: telebot passes only one image at a time.102 There might be a way to gather all images in one post103 and send them immediately.104 But i'm at a loss here105 '''106 if not message.photo: return107 caption = self.footer_helper.caption(message, self.mastodon_character_limit)108 fileID = message.photo[-1].file_id109 self.__prepare_media(fileID, caption)110 def channel_post_video(self, message: telebot.types.Message) -> None:111 if not message.video: return112 caption = self.footer_helper.caption(message, self.mastodon_character_limit)113 fileID = message.video.file_id114 self.__prepare_media(fileID, caption)115 def channel_post_text(self, message: telebot.types.Message) -> None:116 status_text = self.footer_helper.text(message, self.mastodon_character_limit)...

Full Screen

Full Screen

StatusManager.py

Source:StatusManager.py Github

copy

Full Screen

...21 self.loop.call_soon(self.__send_status)22 self.loop.run_forever()23 def open_media_player(self):24 self.media_player = True25 def __send_status(self):26 self.bus.send(can.Message(arbitration_id=0x3E7, data=self.__create_status_frame(), extended_id=False))27 self.loop.call_later(1, self.__send_status)28 def __create_status_frame(self):29 data = self.__get_network_name_bytes()30 data += self.__get_status_byte()31 data += self.__get_media_player_byte()32 return data33 def __get_network_name_bytes(self):34 # if self.network_name is None:35 return bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00])36 # else:37 # text = self.network_name[0 + 6]38 # frames = TextEncoder.encode(text)[0]39 # for i in range(0, 7):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful