Best Python code snippet using fMBT_python
tello_scratch_if.py
Source:tello_scratch_if.py  
1# -*- coding: utf-8 -*-2#3# èªå·±è²¬ä»»ã§ããèªç±ã«ã使ããã ããã4#   2020å¹´4æåæ¥5#   uma3san6#7### to scratch8from http.server import BaseHTTPRequestHandler, HTTPServer9from urllib.parse import parse_qs, urlparse10### to tello11import threading12import socket13import sys14import queue15import cv216from time import sleep17from collections import deque18scratchAdrs          = ( 'localhost',    8001 )     #             PC <-> Scratch on PC19telloCmdAdrs         = ( '192.168.10.1', 8889 )     #  Tello <->  PC ( 50602 )20pcAdrs2SendCmd2Tello = ( '0.0.0.0',      50602 )    #  50602: 決ãã¦ããã°ä½ã§ãå¯ãä¸ã¯åããªãã21                                                    #  tello 㯠PCã(192.168.10.2)ã¨ãã¦ãã22rcvTelloStateAdrs    = ( '0.0.0.0',      8890 )     #  Tello  ->  PC23rcvTelloVideoURL     = ( 'udp://@0.0.0.0:11111' )   #  Tello  ->  PC24INTERVAL = 0.225class SendCmd(threading.Thread):26    def __init__(self):27        self.sendSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # socket for receiving tello state28        self.sendSock.bind( pcAdrs2SendCmd2Tello )                # program ãåèµ·åããæãtello ã®åèµ·åãé¿ããããã«è¨å®ã29        self.sendSock.settimeout(10.0)30        self.finishSignal = False31        self.result = False32        if self.connect() == True:33            super().__init__(daemon=True)34            self.start()35            self.result = True36    37    def connect(self):38        print("-> try to connect the tello", end="")39        counter = 540        while counter > 0:41            self.sendSock.sendto('command'.encode('utf-8'), telloCmdAdrs) # tello ã command ã¢ã¼ãã«ãã42            try:43                response = self.sendSock.recvfrom(1024)                  # ctl-c ã§çµäºããã¨ãtelloãåèµ·åããªãã¨åããªã -> client.bind ã§åèµ·åã¯ä¸è¦ã«44                                                                         # recvfrom ã¯ããã¼ã¿ãåãåãã¨æ»ã£ã¦ããã(buffer ã䏿¯ã§ãªãã¦ã)45                print(response) 46                if b'ok' in response:47                    print("-> connected : set tello to command mode")48                    return True49            except socket.timeout:50                print('\n???? socket timeout : ', counter-1, end="" )51                counter -= 152            except KeyboardInterrupt:53                print('~~CTRL-C')54                return False55        else:56            return False57    #58    # Thread of sending command from PC to Tello59    #60    def run(self):61        while True:62            if self.finishSignal == True:63                self.sendSock.close()64                print(".... SendCmd : socket closed")65                return66                67            if len(cmdQue) != 0:68                if 'emergency' in cmdQue:69                    # é£è¡åæ¢ã³ãã³ã70                    cmd = 'emergency'71                    cmdQue.clear()72                else:73                    # é常ã®ã³ãã³ã74                    cmd = cmdQue.popleft()75                print('{} {} {}'.format("->", cmd, " ... "), end="")76                self.sendSock.sendto(cmd.encode('utf-8'), telloCmdAdrs)77                try:78                    response = self.sendSock.recvfrom(1024)  # (b'ok', ('192.168.10.1', 8889)) 79                                                             #  byte    str             int     ã® tuple80                    if b'ok' in response:81                       print( response[0].decode('utf-8'), response[1][0], response[1][1])82                       pass 83                    elif b'error' in response:84                       print( response[0].decode('utf-8'), response[1][0], response[1][1])85                       # sys.exit()86                except socket.timeout:87                    print("???? send command error : recvfrom time out")88                    continue89            else:90                pass91    def kill_thread(self):92        self.finishSignal = True93class ReceiveTelloState(threading.Thread):94    def __init__(self):95        self.rcvSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # socket for receiving tello state96        self.rcvSock.bind(rcvTelloStateAdrs)97        self.rcvSock.settimeout(1.0)98        self.finishSignal = False99        super().__init__(daemon=True)100        self.start()101    #102    #  The thread of receiving tello state103    #104    def run(self):105        while True:106            if self.finishSignal == True:107                self.rcvSock.close()108                print(".... ReceiveTelloState : socket closed")109                return110            try:111                response, ip = self.rcvSock.recvfrom(1024)112            except socket.timeout as ex:113                print("???? ReceiveTelloState : ", ex)      # tello ã®é»æºãåã£ã¦ããæããã® thread ã®åªå
度ãé«ãã®ãï¼114                continue115            if b'ok' in response:116                print("**ok")                               # state ã¯ãok ãè¿ããªãã117                continue118            #print("state = ", response)119            out = response.rstrip(b'\r\n')                  # æå¾ã®æ¹è¡æåãåé¤120            out = out.replace( b';', b'\n')121            out = out.replace( b':', b' ' )122            #print(out)123            stateQue.append(out)                            # scratch ã«éãããã« queue ã«ç©ã124            125            sleep(INTERVAL)126    def kill_thread(self):127        self.finishSignal = True128#129# http server for scratch editor130#131class MyHTTPRequestHandler(BaseHTTPRequestHandler):132    def do_GET(self):133        parsed_path = urlparse(self.path)134        #print(type(parsed_path))135        if parsed_path.path == '/poll':136            #137            # scratch editor ã polling ãã138            #139            if len(stateQue) == 0:140                # print("  empty")141                pass142            else:143                state = stateQue.popleft()144                self.wfile.write( state )                   # state ã scrach ã«145                # print( state )146            return147            148        #print(type(self.path))149        #print(parsed_path.path)150        #print('path = {}'.format(self.path))151        #print('parsed: path = {}, query = {}'.format(parsed_path.path, parse_qs(parsed_path.query)))152        #153        # scratch editor ã tello command ãéã£ã154        #155        _com = parsed_path.path.replace('/', ' ')156        _com = _com[1:]157        cmdQue.append(_com)158class StartHttpServer(threading.Thread):159    def __init__(self):160        super().__init__(daemon=True)161        self.start()162        #self.finishSignal = False163    def run(self):164        scratchSvr = HTTPServer(scratchAdrs, MyHTTPRequestHandler)165        try:166            scratchSvr.serve_forever()167        except KeyboardInterrupt:168            print(".... startHttpServer : Exit startHttpServer by KeyboardInterrupt")169    def kill_thread(self):170        print(".... StartHttpServer : shutdown")171        self.scratchSvr.shutdown()172    173#174# å¤é¨ãããã® thread ã忢ããæ¹æ³ã¯åãããªã175# ctl-c ã§ã¬ããã¯ãã ãã176#            177class MyInput(threading.Thread):178    def __init__(self):179        self.finishSignal = False   # self.start() ã®å¾ã«æ¸ã㨠run() ã® self.finish ãå
ã«åç
§ããã¦ãno attribute ã® error ãåºãããã 180        self.queue = deque()181        super().__init__(daemon=True)182        self.start()183    def run(self):184        while True:185            if self.finishSignal == True:186                #187                # ãããå®è¡ããããã¨ã¯ãªãã£ãã188                #189                print("....MyInput : close")190                return191            try:192                t = input()193            except Exception as ex:                   # ctl-c ã§ãã®ä¾å¤ãçºçãã¦ãããã°ã©ã ãçµäºãããããã 194                print(".... MyInput : ctl-c")         # ãã®è¡ã¯è¡¨ç¤ºããã195                self.queue.append("!!!! ctl-c")       # ããã°ã©ã ãæ¢ã¾ãã¾ã§æéãããã196                break197                198            if t != None:199                self.queue.append(t)200            201    def input(self, block=True, timeout=None):202        if len(self.queue) == 0:203            return None204        else:205            return self.queue.popleft()206    #207    # ãã®é¢æ°ã¯ä½¿ããªãããå¾ã®åèã®ããã«æ®ãã¦ãã208    #209    def kill_thread(self):210        self.finishSignal = True211def kill_all_thread():212    sendCmd.kill_thread()213    receiveTelloState.kill_thread()214    #cin.kill_thread()215    #startHttpServer.kill_thread()216if __name__ == "__main__":217    #218    # ããã«æ¸ãã¦ããä¸è¨ class ã®ä¸ã§åç
§ã§ãã219    #220    stateQue = deque()         # tello state -> ãã®ããã°ã©ã  -> scratch221    cmdQue   = deque()         # tell cmd    <- ãã®ããã°ã©ã  <- scratch222 223    #224    # all thread start225    #226    sendCmd = SendCmd()        # class ãªã®ã§ () ã¯å¿
è¦ã ã¤ã³ã¹ã¿ã³ã¹ãªã() ã¯ä¸è¦ã¨æãããã227    if sendCmd.result == False:228        print( "\n???? error : can't connect to the tello")229        sys.exit()230        231    receiveTelloState = ReceiveTelloState()232    startHttpServer = StartHttpServer()233    cin = MyInput()234    cmdQue.append('command')235    cmdQue.append('streamon')236    VS_UDP_IP = '0.0.0.0'237    VS_UDP_PORT = 11111238    #udp_video_address = 'udp://@' + VS_UDP_IP + ':' + str(VS_UDP_PORT)239    #cap = cv2.VideoCapture(udp_video_address)240    cap = cv2.VideoCapture(rcvTelloVideoURL)241    #cap.open(udp_video_address)242    cap.open(rcvTelloVideoURL)243    #objects = [ sendCmd, receiveTelloState, startHttpServer, cin ]244    while True:245        try:246            msg = cin.input()247        except KeyboardInterrupt:248            print("....main : Exit KeyboardInterrupt")249            kill_all_thread()                               # ã©ã¡ãã250            # map( lambda func:func.kill_thread(), objects )251            break        252        if msg == None:253            pass254            255        elif '!!!!' in msg:256            break257   258        else:259            if 'quit' in msg:260                print ('.... main : Exit by <quit>')261                kill_all_thread()262                break        263            else:264                cmdQue.append(msg)265        ret, frame = cap.read()266        cv2.imshow('frame', frame)267            268        if cv2.waitKey(1) & 0xFF == ord('q'):269            print ('.... main : Exit by <q> on the video window')270            kill_all_thread()271            break        272    cap.release()273    cv2.destroyAllWindows()274    sendCmd.join()275    print("<<<< join sendCmd")276    receiveTelloState.join()277    print("<<<< join receiveTelloState")278    startHttpServer.kill_thread()279    startHttpServer.join()                 # ãããå®è¡ãããã¨ããã°ã©ã ãçµäºãã280    print("<<<< join startHttpServer")     # ããããä¸ãå®è¡ããããã¨ã¯ãªãã£ã281    cin.join()...httpserver_test.py
Source:httpserver_test.py  
1p# -*- coding: utf-8 -*-2"""3Created on Thu Jun 20 17:27:48 20194@author: THP65"""6# coding:utf-87import threading8import time9import socket10import sys11from wsclient_test import startClient12#import turn_server13from turn_server import turnServer14from multiprocessing import Process15def handle_client(client_socket):16    """17    å¤ç客æ·ç«¯è¯·æ±18    """19    request_data = client_socket.recv(1024)20    print("request data:", request_data)21    # æé ååºæ°æ®22    response_start_line = "HTTP/1.1 200 OK\r\n"23    response_headers = "Server: My server\r\n"24    response_body = "<h1>Python HTTP Test</h1>"25    response = response_start_line + response_headers + "\r\n" + response_body26    # å客æ·ç«¯è¿åååºæ°æ®27    client_socket.send(bytes(response, "utf-8"))28    # å
³é客æ·ç«¯è¿æ¥29    client_socket.close()30def startHttpServer(name):31    print ("startHttpServer")32    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)33    server_socket.bind(("", 8000))34    server_socket.listen(128)35    while True:36        client_socket, client_address = server_socket.accept()37        print("[%s, %s]ç¨æ·è¿æ¥ä¸äº" % client_address)38        handle_client_process = Process(target=handle_client, args=(client_socket,))39        handle_client_process.start()40        client_socket.close()41        42def startWebsocket(ts):43    print ("startWebsocket")44    #ts=turnServer();45    ts.startWebsocket()    46    print ("startWebsocket end")47def startConnetServer(ws):48    try:49        ws.connect()50        ws.run_forever()51    except KeyboardInterrupt:52        ws.close()53        54if __name__ == "__main__":55    ts=None56    try:57        print ("start") 58        ts=turnServer();59        '''60        t1 = threading.Thread(target=startHttpServer,args=('ws1', ))61        t1.start()62        print ("start1ed")63        '''64        t2 = threading.Thread(target=startWebsocket,args=(ts, ))65        t2.start()66        print ("start2ed")67        68        69        #startClient()70        71        while True:72            msg = input(">> ").strip()73            if("e"==msg):74                print("exit")75                sys.exit(0)76            elif("2"==msg):77                print("live")78            else:79                print("cmd:",msg)80        81        time.sleep(10000)82        83    except:84        print ("Error: unable to start thread")85        86    87        88        89        90        ...main.py
Source:main.py  
1#!/usr/bin/python32import time3import sys4import os5import random6import numpy as np7from execute.sequentialEvaluation import sequentialEvaluation #function8from execute.generateBatches import generateBatches #function9from execute.generateBehaviour import generateBehaviour #function10from execute.startHttpServer import startHttpServer #function11from execute.resultsVerification import resultsVerification #function12from batch.batch import Batch #class13from batchDefinition.aBatchDefinition import ABatchDefinition #class14def main2():15  start = time.time()16  sequentialEvaluation()17  end = time.time()18  print()19  print("Time: " + format(end - start, '.5f') + " s")20if __name__ == "__main__":21  #generateBatches()22  if len(sys.argv) == 2 and sys.argv[1] == "-generateBatches":23      generateBatches()24  if len(sys.argv) == 2 and sys.argv[1] == "-generateBehaviours":25      generateBehaviour()26  if len(sys.argv) == 2 and sys.argv[1] == "-startHttpServer":27      startHttpServer()28  if len(sys.argv) == 2 and sys.argv[1] == "-resultVerification":29      resultsVerification()30  if len(sys.argv) == 1:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
