How to use send_config method in yandex-tank

Best Python code snippet using yandex-tank

ZumoTest.py

Source:ZumoTest.py Github

copy

Full Screen

...56 if(topic == "/team20/debug"):57 on_msg_debug(decoded)58 elif(topic == "/team20/state"):59 on_msg_state(decoded)60def send_config(msgType, msgValue):61 global ID62 topic = "/team20/config"63 data = {"ID": ID[topic],64 "Type": msgType,65 "Value": msgValue}66 package = json.dumps(data)67 client.publish(topic,package)68 ID[topic] += 169 print("Sent",package,"to",topic,"@",round(time.time() - starttime,2))70def resetConfig():71 send_config(3, 0) #set speed72 time.sleep(1)73 send_config(8, 0) #disable movement74 send_config(7, 0) #disable pixy75 send_config(6, 0) #disable sensor76 send_config(2, 0) #disable PID77 78 79def test_pause():80 global tests81 print("Running test: pause @",round(time.time() - starttime,2))82 check1 = False83 check2 = False84 send_config(8, 1) #enable movement85 send_config(7, 0) #disable pixy86 send_config(6, 0) #disable sensor87 send_config(2, 0) #disable PID88 send_config(3, 100)#set speed89 time.sleep(2)90 send_config(1, 2) #rover loading91 time.sleep(2)92 if statePaused == 1:93 check1 = True94 send_config(1, 1) #rover moving95 time.sleep(2)96 if statePaused == 0:97 check2 = True98 if check1 and check2:99 tests["pause"] = True100def test_PID_straight():101 print("Running test: PID straight @",round(time.time() - starttime,2))102 send_config(8, 1) #enable movement103 send_config(7, 0) #disable pixy104 send_config(6, 0) #disable sensor105 send_config(2, 1) #enable PID106 send_config(3, 50)#set speed107 time.sleep(1)108 PIDLeftVals.clear()109 PIDRightVals.clear()110 time.sleep(5)111 send_config(3, 100)#set speed112 time.sleep(5)113 send_config(3, 150)#set speed114 time.sleep(5)115 send_config(3, 200)#set speed116 time.sleep(5)117 send_config(3, 255)#set speed118 time.sleep(5)119 send_config(3, 0)#set speed120 plt.plot(PIDLeftVals)121 plt.plot(PIDRightVals)122 plt.ylabel('PID adjustment')123 plt.xlabel('trial')124 plt.show()125def test_PID_turning():126 print("Running test: PID turning @",round(time.time() - starttime,2))127 send_config(8, 1) #enable movement128 send_config(7, 0) #disable pixy129 send_config(6, 0) #disable sensor130 send_config(2, 1) #enable PID131 send_config(5, 50)#turn left132 time.sleep(1)133 PIDLeftVals.clear()134 PIDRightVals.clear()135 time.sleep(5)136 send_config(5, 100)#set speed137 time.sleep(5)138 send_config(5, 150)#set speed139 time.sleep(5)140 send_config(5, 200)#set speed141 time.sleep(5)142 send_config(5, 255)#set speed143 time.sleep(5)144 send_config(5, 0)#set speed145 plt.plot(PIDLeftVals)146 plt.plot(PIDRightVals)147 plt.ylabel('PID adjustment')148 plt.xlabel('trial')149 plt.show()150def test_distance():151 global tests152 print("Running test: distance @",round(time.time() - starttime,2))153 check1 = False154 check2 = False155 check3 = False156 send_config(3, 0) #set speed157 send_config(7, 0) #disable pixy158 send_config(6, 0) #disable sensor159 send_config(2, 0) #disable PID160 send_config(8, 1) #disable movement161 input("distance: press enter to check 20 inches")162 sensorVals.clear()163 time.sleep(2)164 if 18 <= statistics.mean(sensorVals) <= 22:165 check1 = True166 print("Average for 20 inches was:",statistics.mean(sensorVals))167 input("distance: press enter to check 12 inches") 168 sensorVals.clear()169 time.sleep(2)170 if 10 <= statistics.mean(sensorVals) <= 14:171 check2 = True172 print("Average for 12 inches was:",statistics.mean(sensorVals))173 input("distance: press enter to check 6 inches")174 sensorVals.clear()175 time.sleep(2)176 if 4 <= statistics.mean(sensorVals) <= 8:177 check3 = True178 print("Average for 6 inches was:",statistics.mean(sensorVals)) 179 if check1 and check2 and check3:180 tests["distance"] = True181def test_capture():182 global tests183 print("Running test: capture @",round(time.time() - starttime,2))184 check1 = False185 check2 = False186 check3 = False187 send_config(8, 1) #enable movement188 send_config(7, 0) #disable pixy189 send_config(6, 0) #disable sensor190 send_config(2, 0) #disable PID191 send_config(3, 100) #set speed192 time.sleep(1)193 capLeftVals.clear()194 capRightVals.clear()195 time.sleep(5.5)196 print(capLeftVals)197 print(capRightVals)198 if len(capLeftVals) >= 12 and len(capRightVals) >= 12:199 check1 = True200 if 4400 < statistics.mean(capLeftVals) < 7000:201 check2 = True202 if 4400 < statistics.mean(capRightVals) < 7000:203 check3 = True204 if check1 and check2 and check3:205 tests["capture"] = True206def test_pantilt():207 print("Running test: pan/tilt @",round(time.time() - starttime,2))208 send_config(8, 1) #enable movement209 send_config(7, 0) #disable pixy210 send_config(6, 0) #disable sensor211 send_config(2, 0) #disable PID212 time.sleep(10)213 if stateTracking == 1: #found it214 tests["pantilt"] = True215def test_turning():216 print("Running test: turning @",round(time.time() - starttime,2))217 send_config(8, 1) #enable movement218 send_config(7, 1) #enable pixy219 send_config(6, 0) #disable sensor220 send_config(2, 0) #disable PID221 time.sleep(20)222def test_movement():223 global tests224 print("Running test: movement @",round(time.time() - starttime,2))225 send_config(8, 1) #enable movement226 send_config(7, 0) #disable pixy227 send_config(6, 1) #enable sensor228 send_config(2, 0) #disable PID229 send_config(3, 255) #set speed230 time.sleep(20)231 sensorVals.clear()232 time.sleep(1)233 if 6 <= statistics.mean(sensorVals) <= 12:234 tests["movement"] = True235def test_sync():236 global tests237 print("Running test: track @",round(time.time() - starttime,2))238 send_config(8, 0) #disable movement239 send_config(7, 0) #disable pixy240 send_config(6, 0) #disable sensor241 send_config(2, 0) #disable PID242 while(stateTracking != 0):243 pass #wait for pixy to lose target244 print("sync: target lost")245 while(stateTracking != 1):246 pass #wait for pixy to find target247 if stateTracking == 1: #found it248 tests["sync"] = True249def test_approach():250 global tests251 print("Running test: approach @",round(time.time() - starttime,2))252 send_config(8, 1) #enable movement253 send_config(7, 1) #enable pixy254 send_config(6, 1) #enable sensor255 send_config(2, 1) #enable PID256 time.sleep(15)257 sensorVals.clear()258 time.sleep(2)259 tests["approach"] = True260def run_tests():261 print("Thread started: run_tests")262 waiting = 0263 while(not connected):264 time.sleep(1)265 print("Waiting for connection:", waiting)266 waiting+=1267 run_tests = ["pantilt", "distance", "movement", "sync", "turning", "pause", "capture", "PID_straight", "PID_turning", "approach"]268 for func in run_tests:269 input("Press Enter to continue to test: " + func)...

Full Screen

Full Screen

sender.py

Source:sender.py Github

copy

Full Screen

1import threading2import pickle3import socket4from Utils import PDU, Config, CRC5import os6import signal7import random8send_config = Config() #读取配置文件9event = threading.Event() #event flag10lock = threading.RLock() #递归锁11send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #客户端socket12send_socket.bind(send_config.send_addr)13send_file = open(send_config.send_file, 'rb') #以二进制方式打开待发送的文件,测试发送一份pdf文件14send_log = open(send_config.send_log, 'w') #创建日志15# 是否产生错误16def should_error():17 global host2_config18 # 产生一个1~error_rate的随机数,如果刚好等于error_rate返回True19 return True if random.randint(1, send_config.error_rate) == send_config.error_rate else False20# 填充pdu队列21def fill_pdu_q():22 global send_config23 global send_file24 while True:25 # 维护滑动窗口,当已发送的pdu与已收到的ack之差小于等于窗口大小时才生成新的pdu26 while send_config.pdu_to_send - send_config.acked_num <= send_config.sw_size:27 if send_config.pdu_to_send > send_config.pdu_sum + 1: #发送完文件后再发送一份空数据28 break29 lock.acquire() #加锁30 send_file.seek((send_config.pdu_to_send - 1) * send_config.data_size) #使文件指针指向当前应该读取的字段31 data = send_file.read(send_config.data_size) #读取指定大小的内容32 checksum = CRC().calculate(data) #计算checksum33 if should_error(): #模拟出错34 checksum += 135 pdu = PDU(num_to_send=send_config.num_to_send,36 pdu_to_send=send_config.pdu_to_send,37 status=('NEW' if send_config.pdu_to_send > send_config.pdu_to_resend else 'RT'),38 acked_num=send_config.acked_num,39 data=data,40 checksum='%d'%checksum)41 send_config.pdu_q.put(pdu) #加入队列42 send_config.num_to_send += 1 #发送次数+143 send_config.pdu_to_send += 1 #发送pdu数+144 lock.release() #释放锁45 event.set() #设置event为ture,唤醒send_frame线程46#发送pdu队列47def send_pdu():48 global send_config49 global send_socket50 global send_log51 print('%d pdus to be send' % send_config.pdu_sum)52 event.wait() #阻塞等待fill_pdu_q线程53 while True:54 lock.acquire() #加锁55 while not send_config.pdu_q.empty(): #pdu队列不空时发送56 pdu = send_config.pdu_q.get()57 send_socket.sendto(pickle.dumps(pdu.get_pdu()), send_config.recv_addr) #frame发送至服务端58 log = pdu.get_log() #获取日志59 send_log.write(log + '\n') #写入日志60 print(log)61 lock.release() #释放锁62 event.clear() #设置event为false63 event.wait() #阻塞等待fill_pud_q队列64#接收ack65def receive_ack():66 global host2_config67 global host2_socket68 global host2_send_file69 global host2_send_log70 while True:71 # print("等待接收%d" % (config.acked_num+1))72 send_socket.settimeout(send_config.timeout) # 设置阻塞接受时间73 try:74 ack = send_socket.recvfrom(1024)[0]75 except socket.timeout: # 超时重发76 threading.Thread(name='resend', target=resend).start()77 continue78 ack = pickle.loads(ack)79 lock.acquire()80 send_config.acked_num = ack # 修改已收到的ack81 print('receive ack: ', ack)82 if send_config.acked_num == send_config.pdu_sum + 1: #所有ack接受完毕,结束进程83 print('send complete')84 send_log.write('send complete\n')85 send_file.close()86 send_log.close()87 os.kill(os.getpid(), signal.SIGTERM)88 lock.release()89#开启重发90def resend():91 global host2_config92 lock.acquire()93 send_config.pdu_to_resend = send_config.pdu_to_send - 1 #设置需要重发的pdu序号94 send_config.pdu_to_send = send_config.acked_num + 1 #重新设置已经发送的pdu序号95 print('resend pdu from %d to %d' % (send_config.pdu_to_send, send_config.pdu_to_resend))96 lock.release()97if __name__ == '__main__':98 fill_pdu_q = threading.Thread(name='fill_pud_q', target=fill_pdu_q)99 send_pdu = threading.Thread(name='send_pdu', target=send_pdu)100 receiver_ack = threading.Thread(name='receive_ack', target=receive_ack)101 fill_pdu_q.start()102 send_pdu.start()...

Full Screen

Full Screen

send_email.py

Source:send_email.py Github

copy

Full Screen

1import smtplib2from email import encoders3from email.mime.multipart import MIMEMultipart4from email.mime.text import MIMEText5from email.mime.base import MIMEBase6def send(send_config):7 # extract from send_config8 from_address = send_config["from_address"]9 password = send_config["password"]10 from_name = send_config["from_name"]11 subject = send_config["subject"]12 dest_address = send_config["dest_address"]13 html_content = send_config["html_content"]14 opened_attachment = send_config["opened_attachment"]15 attachment_name = send_config["attachment_name"]16 # connect to server17 server = smtplib.SMTP("smtp.gmail.com", 587)18 server.starttls()19 server.login(from_address, password)20 # message21 msg = MIMEMultipart()22 msg['Subject'] = subject23 msg['From'] = from_name24 msg['To'] = dest_address25 # plain text can also be sent with "text" instead of "html"26 msg.attach(MIMEText(html_content, 'html'))27 # email attachment28 if send_config["opened_attachement"]:29 attachment = open(opened_attachment, "rb")30 p = MIMEBase("application", "octet-stream") # payload object31 p.set_payload(attachment.read())32 encoders.encode_base64(p)33 p.add_header("Content-Disposition", "attachment; filename=" + attachment_name)34 msg.attach(p)35 # send mail36 text = msg.as_string()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful