Best Python code snippet using yandex-tank
ZumoTest.py
Source:ZumoTest.py  
...56    if(topic == "/team20/debug"):57        on_msg_debug(decoded)58    elif(topic == "/team20/state"):59        on_msg_state(decoded)60def send_config(msgType, msgValue):61    global ID62    topic = "/team20/config"63    data = {"ID": ID[topic],64            "Type": msgType,65            "Value": msgValue}66    package = json.dumps(data)67    client.publish(topic,package)68    ID[topic] += 169    print("Sent",package,"to",topic,"@",round(time.time() - starttime,2))70def resetConfig():71    send_config(3, 0) #set speed72    time.sleep(1)73    send_config(8, 0) #disable movement74    send_config(7, 0) #disable pixy75    send_config(6, 0) #disable sensor76    send_config(2, 0) #disable PID77    78    79def test_pause():80    global tests81    print("Running test: pause @",round(time.time() - starttime,2))82    check1 = False83    check2 = False84    send_config(8, 1) #enable movement85    send_config(7, 0) #disable pixy86    send_config(6, 0) #disable sensor87    send_config(2, 0) #disable PID88    send_config(3, 100)#set speed89    time.sleep(2)90    send_config(1, 2) #rover loading91    time.sleep(2)92    if statePaused == 1:93        check1 = True94    send_config(1, 1) #rover moving95    time.sleep(2)96    if statePaused == 0:97        check2 = True98    if check1 and check2:99        tests["pause"] = True100def test_PID_straight():101    print("Running test: PID straight @",round(time.time() - starttime,2))102    send_config(8, 1) #enable movement103    send_config(7, 0) #disable pixy104    send_config(6, 0) #disable sensor105    send_config(2, 1) #enable PID106    send_config(3, 50)#set speed107    time.sleep(1)108    PIDLeftVals.clear()109    PIDRightVals.clear()110    time.sleep(5)111    send_config(3, 100)#set speed112    time.sleep(5)113    send_config(3, 150)#set speed114    time.sleep(5)115    send_config(3, 200)#set speed116    time.sleep(5)117    send_config(3, 255)#set speed118    time.sleep(5)119    send_config(3, 0)#set speed120    plt.plot(PIDLeftVals)121    plt.plot(PIDRightVals)122    plt.ylabel('PID adjustment')123    plt.xlabel('trial')124    plt.show()125def test_PID_turning():126    print("Running test: PID turning @",round(time.time() - starttime,2))127    send_config(8, 1) #enable movement128    send_config(7, 0) #disable pixy129    send_config(6, 0) #disable sensor130    send_config(2, 1) #enable PID131    send_config(5, 50)#turn left132    time.sleep(1)133    PIDLeftVals.clear()134    PIDRightVals.clear()135    time.sleep(5)136    send_config(5, 100)#set speed137    time.sleep(5)138    send_config(5, 150)#set speed139    time.sleep(5)140    send_config(5, 200)#set speed141    time.sleep(5)142    send_config(5, 255)#set speed143    time.sleep(5)144    send_config(5, 0)#set speed145    plt.plot(PIDLeftVals)146    plt.plot(PIDRightVals)147    plt.ylabel('PID adjustment')148    plt.xlabel('trial')149    plt.show()150def test_distance():151    global tests152    print("Running test: distance @",round(time.time() - starttime,2))153    check1 = False154    check2 = False155    check3 = False156    send_config(3, 0) #set speed157    send_config(7, 0) #disable pixy158    send_config(6, 0) #disable sensor159    send_config(2, 0) #disable PID160    send_config(8, 1) #disable movement161    input("distance: press enter to check 20 inches")162    sensorVals.clear()163    time.sleep(2)164    if 18 <= statistics.mean(sensorVals) <= 22:165        check1 = True166    print("Average for 20 inches was:",statistics.mean(sensorVals))167    input("distance: press enter to check 12 inches")    168    sensorVals.clear()169    time.sleep(2)170    if 10 <= statistics.mean(sensorVals) <= 14:171        check2 = True172    print("Average for 12 inches was:",statistics.mean(sensorVals))173    input("distance: press enter to check 6 inches")174    sensorVals.clear()175    time.sleep(2)176    if 4 <= statistics.mean(sensorVals) <= 8:177        check3 = True178    print("Average for 6 inches was:",statistics.mean(sensorVals))      179    if check1 and check2 and check3:180        tests["distance"] = True181def test_capture():182    global tests183    print("Running test: capture @",round(time.time() - starttime,2))184    check1 = False185    check2 = False186    check3 = False187    send_config(8, 1) #enable movement188    send_config(7, 0) #disable pixy189    send_config(6, 0) #disable sensor190    send_config(2, 0) #disable PID191    send_config(3, 100) #set speed192    time.sleep(1)193    capLeftVals.clear()194    capRightVals.clear()195    time.sleep(5.5)196    print(capLeftVals)197    print(capRightVals)198    if len(capLeftVals) >= 12 and len(capRightVals) >= 12:199        check1 = True200    if 4400 < statistics.mean(capLeftVals) < 7000:201        check2 = True202    if 4400 < statistics.mean(capRightVals) < 7000:203        check3 = True204    if check1 and check2 and check3:205        tests["capture"] = True206def test_pantilt():207    print("Running test: pan/tilt @",round(time.time() - starttime,2))208    send_config(8, 1) #enable movement209    send_config(7, 0) #disable pixy210    send_config(6, 0) #disable sensor211    send_config(2, 0) #disable PID212    time.sleep(10)213    if stateTracking == 1: #found it214        tests["pantilt"] = True215def test_turning():216    print("Running test: turning @",round(time.time() - starttime,2))217    send_config(8, 1) #enable movement218    send_config(7, 1) #enable pixy219    send_config(6, 0) #disable sensor220    send_config(2, 0) #disable PID221    time.sleep(20)222def test_movement():223    global tests224    print("Running test: movement @",round(time.time() - starttime,2))225    send_config(8, 1) #enable movement226    send_config(7, 0) #disable pixy227    send_config(6, 1) #enable sensor228    send_config(2, 0) #disable PID229    send_config(3, 255) #set speed230    time.sleep(20)231    sensorVals.clear()232    time.sleep(1)233    if 6 <= statistics.mean(sensorVals) <= 12:234        tests["movement"] = True235def test_sync():236    global tests237    print("Running test: track @",round(time.time() - starttime,2))238    send_config(8, 0) #disable movement239    send_config(7, 0) #disable pixy240    send_config(6, 0) #disable sensor241    send_config(2, 0) #disable PID242    while(stateTracking != 0):243        pass #wait for pixy to lose target244    print("sync: target lost")245    while(stateTracking != 1):246        pass #wait for pixy to find target247    if stateTracking == 1: #found it248        tests["sync"] = True249def test_approach():250    global tests251    print("Running test: approach @",round(time.time() - starttime,2))252    send_config(8, 1) #enable movement253    send_config(7, 1) #enable pixy254    send_config(6, 1) #enable sensor255    send_config(2, 1) #enable PID256    time.sleep(15)257    sensorVals.clear()258    time.sleep(2)259    tests["approach"] = True260def run_tests():261    print("Thread started: run_tests")262    waiting = 0263    while(not connected):264        time.sleep(1)265        print("Waiting for connection:", waiting)266        waiting+=1267    run_tests = ["pantilt", "distance", "movement", "sync", "turning", "pause", "capture", "PID_straight", "PID_turning", "approach"]268    for func in run_tests:269        input("Press Enter to continue to test: " + func)...sender.py
Source:sender.py  
1import threading2import pickle3import socket4from Utils import PDU, Config, CRC5import os6import signal7import random8send_config = Config()  #读åé
ç½®æä»¶9event = threading.Event()  #event flag10lock = threading.RLock()  #éå½é11send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  #客æ·ç«¯socket12send_socket.bind(send_config.send_addr)13send_file = open(send_config.send_file, 'rb')  #以äºè¿å¶æ¹å¼æå¼å¾
åéçæä»¶ï¼æµè¯åéä¸ä»½pdfæä»¶14send_log = open(send_config.send_log, 'w')  #å建æ¥å¿15# æ¯å¦äº§çé误16def should_error():17    global host2_config18    # 产çä¸ä¸ª1~error_rateçéæºæ°ï¼å¦æå好çäºerror_rateè¿åTrue19    return True if random.randint(1, send_config.error_rate) == send_config.error_rate else False20# å¡«å
pduéå21def fill_pdu_q():22    global send_config23    global send_file24    while True:25        # ç»´æ¤æ»å¨çªå£ï¼å½å·²åéçpduä¸å·²æ¶å°çackä¹å·®å°äºçäºçªå£å¤§å°æ¶æçææ°çpdu26        while send_config.pdu_to_send - send_config.acked_num <= send_config.sw_size:27            if send_config.pdu_to_send > send_config.pdu_sum + 1:  #åé宿件åååéä¸ä»½ç©ºæ°æ®28                break29            lock.acquire()  #å é30            send_file.seek((send_config.pdu_to_send - 1) * send_config.data_size)  #使æä»¶æéæåå½ååºè¯¥è¯»åçåæ®µ31            data = send_file.read(send_config.data_size)  #读åæå®å¤§å°çå
容32            checksum = CRC().calculate(data)    #计ç®checksum33            if should_error():  #模æåºé34                checksum += 135            pdu = PDU(num_to_send=send_config.num_to_send,36                      pdu_to_send=send_config.pdu_to_send,37                      status=('NEW' if send_config.pdu_to_send > send_config.pdu_to_resend else 'RT'),38                      acked_num=send_config.acked_num,39                      data=data,40                      checksum='%d'%checksum)41            send_config.pdu_q.put(pdu)  #å å
¥éå42            send_config.num_to_send += 1  #å鿬¡æ°+143            send_config.pdu_to_send += 1  #åépduæ°+144            lock.release()  #éæ¾é45        event.set()  #设置event为tureï¼å¤ésend_frame线ç¨46#åépduéå47def send_pdu():48    global send_config49    global send_socket50    global send_log51    print('%d pdus to be send' % send_config.pdu_sum)52    event.wait()  #é»å¡çå¾
fill_pdu_q线ç¨53    while True:54        lock.acquire()  #å é55        while not send_config.pdu_q.empty():  #pduéåä¸ç©ºæ¶åé56            pdu = send_config.pdu_q.get()57            send_socket.sendto(pickle.dumps(pdu.get_pdu()), send_config.recv_addr)  #frameåéè³æå¡ç«¯58            log = pdu.get_log()  #è·åæ¥å¿59            send_log.write(log + '\n')  #åå
¥æ¥å¿60            print(log)61        lock.release()  #éæ¾é62        event.clear()  #设置event为false63        event.wait()  #é»å¡çå¾
fill_pud_qéå64#æ¥æ¶ack65def receive_ack():66    global host2_config67    global host2_socket68    global host2_send_file69    global host2_send_log70    while True:71        # print("çå¾
æ¥æ¶%d" % (config.acked_num+1))72        send_socket.settimeout(send_config.timeout)  # 设置é»å¡æ¥åæ¶é´73        try:74            ack = send_socket.recvfrom(1024)[0]75        except socket.timeout:  # è¶
æ¶éå76            threading.Thread(name='resend', target=resend).start()77            continue78        ack = pickle.loads(ack)79        lock.acquire()80        send_config.acked_num = ack  # ä¿®æ¹å·²æ¶å°çack81        print('receive ack: ', ack)82        if send_config.acked_num == send_config.pdu_sum + 1:  #ææackæ¥å宿¯ï¼ç»æè¿ç¨83            print('send complete')84            send_log.write('send complete\n')85            send_file.close()86            send_log.close()87            os.kill(os.getpid(), signal.SIGTERM)88        lock.release()89#å¼å¯éå90def resend():91    global host2_config92    lock.acquire()93    send_config.pdu_to_resend = send_config.pdu_to_send - 1  #设置éè¦éåçpduåºå·94    send_config.pdu_to_send = send_config.acked_num + 1  #éæ°è®¾ç½®å·²ç»åéçpduåºå·95    print('resend pdu from %d to %d' % (send_config.pdu_to_send, send_config.pdu_to_resend))96    lock.release()97if __name__ == '__main__':98    fill_pdu_q = threading.Thread(name='fill_pud_q', target=fill_pdu_q)99    send_pdu = threading.Thread(name='send_pdu', target=send_pdu)100    receiver_ack = threading.Thread(name='receive_ack', target=receive_ack)101    fill_pdu_q.start()102    send_pdu.start()...send_email.py
Source:send_email.py  
1import smtplib2from email import encoders3from email.mime.multipart import MIMEMultipart4from email.mime.text import MIMEText5from email.mime.base import MIMEBase6def send(send_config):7    # extract from send_config8    from_address = send_config["from_address"]9    password = send_config["password"]10    from_name = send_config["from_name"]11    subject = send_config["subject"]12    dest_address = send_config["dest_address"]13    html_content = send_config["html_content"]14    opened_attachment = send_config["opened_attachment"]15    attachment_name = send_config["attachment_name"]16    # connect to server17    server = smtplib.SMTP("smtp.gmail.com", 587)18    server.starttls()19    server.login(from_address, password)20    # message21    msg = MIMEMultipart()22    msg['Subject'] = subject23    msg['From'] = from_name24    msg['To'] = dest_address25    # plain text can also be sent with "text" instead of "html"26    msg.attach(MIMEText(html_content, 'html'))27    # email attachment28    if send_config["opened_attachement"]:29        attachment = open(opened_attachment, "rb")30        p = MIMEBase("application", "octet-stream")  # payload object31        p.set_payload(attachment.read())32        encoders.encode_base64(p)33        p.add_header("Content-Disposition", "attachment; filename=" + attachment_name)34        msg.attach(p)35    # send mail36    text = msg.as_string()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
