How to use startHTTPServer method in fMBT

Best Python code snippet using fMBT_python

tello_scratch_if.py

Source:tello_scratch_if.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#3# 自己責任で、ご自由にお使いください。4# 2020年4月吉日5# uma3san6#7### to scratch8from http.server import BaseHTTPRequestHandler, HTTPServer9from urllib.parse import parse_qs, urlparse10### to tello11import threading12import socket13import sys14import queue15import cv216from time import sleep17from collections import deque18scratchAdrs = ( 'localhost', 8001 ) # PC <-> Scratch on PC19telloCmdAdrs = ( '192.168.10.1', 8889 ) # Tello <-> PC ( 50602 )20pcAdrs2SendCmd2Tello = ( '0.0.0.0', 50602 ) # 50602: 決めておけば何でも可。上は動かない。21 # tello は PCを(192.168.10.2)としている22rcvTelloStateAdrs = ( '0.0.0.0', 8890 ) # Tello -> PC23rcvTelloVideoURL = ( 'udp://@0.0.0.0:11111' ) # Tello -> PC24INTERVAL = 0.225class SendCmd(threading.Thread):26 def __init__(self):27 self.sendSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # socket for receiving tello state28 self.sendSock.bind( pcAdrs2SendCmd2Tello ) # program を再起動する時、tello の再起動を避けるために設定。29 self.sendSock.settimeout(10.0)30 self.finishSignal = False31 self.result = False32 if self.connect() == True:33 super().__init__(daemon=True)34 self.start()35 self.result = True36 37 def connect(self):38 print("-> try to connect the tello", end="")39 counter = 540 while counter > 0:41 self.sendSock.sendto('command'.encode('utf-8'), telloCmdAdrs) # tello を command モードにする42 try:43 response = self.sendSock.recvfrom(1024) # ctl-c で終了すると、telloを再起動しないと動かない -> client.bind で再起動は不要に44 # recvfrom は、データを受け取ると戻ってくる。(buffer が一杯でなくても)45 print(response) 46 if b'ok' in response:47 print("-> connected : set tello to command mode")48 return True49 except socket.timeout:50 print('\n???? socket timeout : ', counter-1, end="" )51 counter -= 152 except KeyboardInterrupt:53 print('~~CTRL-C')54 return False55 else:56 return False57 #58 # Thread of sending command from PC to Tello59 #60 def run(self):61 while True:62 if self.finishSignal == True:63 self.sendSock.close()64 print(".... SendCmd : socket closed")65 return66 67 if len(cmdQue) != 0:68 if 'emergency' in cmdQue:69 # 飛行停止コマンド70 cmd = 'emergency'71 cmdQue.clear()72 else:73 # 通常のコマンド74 cmd = cmdQue.popleft()75 print('{} {} {}'.format("->", cmd, " ... "), end="")76 self.sendSock.sendto(cmd.encode('utf-8'), telloCmdAdrs)77 try:78 response = self.sendSock.recvfrom(1024) # (b'ok', ('192.168.10.1', 8889)) 79 # byte str int の tuple80 if b'ok' in response:81 print( response[0].decode('utf-8'), response[1][0], response[1][1])82 pass 83 elif b'error' in response:84 print( response[0].decode('utf-8'), response[1][0], response[1][1])85 # sys.exit()86 except socket.timeout:87 print("???? send command error : recvfrom time out")88 continue89 else:90 pass91 def kill_thread(self):92 self.finishSignal = True93class ReceiveTelloState(threading.Thread):94 def __init__(self):95 self.rcvSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # socket for receiving tello state96 self.rcvSock.bind(rcvTelloStateAdrs)97 self.rcvSock.settimeout(1.0)98 self.finishSignal = False99 super().__init__(daemon=True)100 self.start()101 #102 # The thread of receiving tello state103 #104 def run(self):105 while True:106 if self.finishSignal == True:107 self.rcvSock.close()108 print(".... ReceiveTelloState : socket closed")109 return110 try:111 response, ip = self.rcvSock.recvfrom(1024)112 except socket.timeout as ex:113 print("???? ReceiveTelloState : ", ex) # tello の電源を切っている時、この thread の優先度が高いのか?114 continue115 if b'ok' in response:116 print("**ok") # state は、ok を返さない。117 continue118 #print("state = ", response)119 out = response.rstrip(b'\r\n') # 最後の改行文字を削除120 out = out.replace( b';', b'\n')121 out = out.replace( b':', b' ' )122 #print(out)123 stateQue.append(out) # scratch に送るために queue に積む124 125 sleep(INTERVAL)126 def kill_thread(self):127 self.finishSignal = True128#129# http server for scratch editor130#131class MyHTTPRequestHandler(BaseHTTPRequestHandler):132 def do_GET(self):133 parsed_path = urlparse(self.path)134 #print(type(parsed_path))135 if parsed_path.path == '/poll':136 #137 # scratch editor が polling した138 #139 if len(stateQue) == 0:140 # print(" empty")141 pass142 else:143 state = stateQue.popleft()144 self.wfile.write( state ) # state を scrach に145 # print( state )146 return147 148 #print(type(self.path))149 #print(parsed_path.path)150 #print('path = {}'.format(self.path))151 #print('parsed: path = {}, query = {}'.format(parsed_path.path, parse_qs(parsed_path.query)))152 #153 # scratch editor が tello command を送った154 #155 _com = parsed_path.path.replace('/', ' ')156 _com = _com[1:]157 cmdQue.append(_com)158class StartHttpServer(threading.Thread):159 def __init__(self):160 super().__init__(daemon=True)161 self.start()162 #self.finishSignal = False163 def run(self):164 scratchSvr = HTTPServer(scratchAdrs, MyHTTPRequestHandler)165 try:166 scratchSvr.serve_forever()167 except KeyboardInterrupt:168 print(".... startHttpServer : Exit startHttpServer by KeyboardInterrupt")169 def kill_thread(self):170 print(".... StartHttpServer : shutdown")171 self.scratchSvr.shutdown()172 173#174# 外部からこの thread を停止する方法は分からない175# ctl-c でぬけるはずだが。176# 177class MyInput(threading.Thread):178 def __init__(self):179 self.finishSignal = False # self.start() の後に書くと run() の self.finish が先に参照されて、no attribute の error が出るようだ180 self.queue = deque()181 super().__init__(daemon=True)182 self.start()183 def run(self):184 while True:185 if self.finishSignal == True:186 #187 # ここが実行されたことはなかった。188 #189 print("....MyInput : close")190 return191 try:192 t = input()193 except Exception as ex: # ctl-c でこの例外が発生して、プログラムが終了されるようだ194 print(".... MyInput : ctl-c") # この行は表示される195 self.queue.append("!!!! ctl-c") # プログラムが止まるまで時間がかかる196 break197 198 if t != None:199 self.queue.append(t)200 201 def input(self, block=True, timeout=None):202 if len(self.queue) == 0:203 return None204 else:205 return self.queue.popleft()206 #207 # この関数は使わないが、後の参考のために残しておく208 #209 def kill_thread(self):210 self.finishSignal = True211def kill_all_thread():212 sendCmd.kill_thread()213 receiveTelloState.kill_thread()214 #cin.kill_thread()215 #startHttpServer.kill_thread()216if __name__ == "__main__":217 #218 # ここに書いても、上記 class の中で参照できる219 #220 stateQue = deque() # tello state -> このプログラム -> scratch221 cmdQue = deque() # tell cmd <- このプログラム <- scratch222 223 #224 # all thread start225 #226 sendCmd = SendCmd() # class なので () は必要。 インスタンスなら() は不要と思われる。227 if sendCmd.result == False:228 print( "\n???? error : can't connect to the tello")229 sys.exit()230 231 receiveTelloState = ReceiveTelloState()232 startHttpServer = StartHttpServer()233 cin = MyInput()234 cmdQue.append('command')235 cmdQue.append('streamon')236 VS_UDP_IP = '0.0.0.0'237 VS_UDP_PORT = 11111238 #udp_video_address = 'udp://@' + VS_UDP_IP + ':' + str(VS_UDP_PORT)239 #cap = cv2.VideoCapture(udp_video_address)240 cap = cv2.VideoCapture(rcvTelloVideoURL)241 #cap.open(udp_video_address)242 cap.open(rcvTelloVideoURL)243 #objects = [ sendCmd, receiveTelloState, startHttpServer, cin ]244 while True:245 try:246 msg = cin.input()247 except KeyboardInterrupt:248 print("....main : Exit KeyboardInterrupt")249 kill_all_thread() # どちらか250 # map( lambda func:func.kill_thread(), objects )251 break 252 if msg == None:253 pass254 255 elif '!!!!' in msg:256 break257 258 else:259 if 'quit' in msg:260 print ('.... main : Exit by <quit>')261 kill_all_thread()262 break 263 else:264 cmdQue.append(msg)265 ret, frame = cap.read()266 cv2.imshow('frame', frame)267 268 if cv2.waitKey(1) & 0xFF == ord('q'):269 print ('.... main : Exit by <q> on the video window')270 kill_all_thread()271 break 272 cap.release()273 cv2.destroyAllWindows()274 sendCmd.join()275 print("<<<< join sendCmd")276 receiveTelloState.join()277 print("<<<< join receiveTelloState")278 startHttpServer.kill_thread()279 startHttpServer.join() # ここが実行されるとプログラムが終了する280 print("<<<< join startHttpServer") # ここから下が実行されることはなかった281 cin.join()...

Full Screen

Full Screen

httpserver_test.py

Source:httpserver_test.py Github

copy

Full Screen

1p# -*- coding: utf-8 -*-2"""3Created on Thu Jun 20 17:27:48 20194@author: THP65"""6# coding:utf-87import threading8import time9import socket10import sys11from wsclient_test import startClient12#import turn_server13from turn_server import turnServer14from multiprocessing import Process15def handle_client(client_socket):16 """17 处理客户端请求18 """19 request_data = client_socket.recv(1024)20 print("request data:", request_data)21 # 构造响应数据22 response_start_line = "HTTP/1.1 200 OK\r\n"23 response_headers = "Server: My server\r\n"24 response_body = "<h1>Python HTTP Test</h1>"25 response = response_start_line + response_headers + "\r\n" + response_body26 # 向客户端返回响应数据27 client_socket.send(bytes(response, "utf-8"))28 # 关闭客户端连接29 client_socket.close()30def startHttpServer(name):31 print ("startHttpServer")32 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)33 server_socket.bind(("", 8000))34 server_socket.listen(128)35 while True:36 client_socket, client_address = server_socket.accept()37 print("[%s, %s]用户连接上了" % client_address)38 handle_client_process = Process(target=handle_client, args=(client_socket,))39 handle_client_process.start()40 client_socket.close()41 42def startWebsocket(ts):43 print ("startWebsocket")44 #ts=turnServer();45 ts.startWebsocket() 46 print ("startWebsocket end")47def startConnetServer(ws):48 try:49 ws.connect()50 ws.run_forever()51 except KeyboardInterrupt:52 ws.close()53 54if __name__ == "__main__":55 ts=None56 try:57 print ("start") 58 ts=turnServer();59 '''60 t1 = threading.Thread(target=startHttpServer,args=('ws1', ))61 t1.start()62 print ("start1ed")63 '''64 t2 = threading.Thread(target=startWebsocket,args=(ts, ))65 t2.start()66 print ("start2ed")67 68 69 #startClient()70 71 while True:72 msg = input(">> ").strip()73 if("e"==msg):74 print("exit")75 sys.exit(0)76 elif("2"==msg):77 print("live")78 else:79 print("cmd:",msg)80 81 time.sleep(10000)82 83 except:84 print ("Error: unable to start thread")85 86 87 88 89 90 ...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1#!/usr/bin/python32import time3import sys4import os5import random6import numpy as np7from execute.sequentialEvaluation import sequentialEvaluation #function8from execute.generateBatches import generateBatches #function9from execute.generateBehaviour import generateBehaviour #function10from execute.startHttpServer import startHttpServer #function11from execute.resultsVerification import resultsVerification #function12from batch.batch import Batch #class13from batchDefinition.aBatchDefinition import ABatchDefinition #class14def main2():15 start = time.time()16 sequentialEvaluation()17 end = time.time()18 print()19 print("Time: " + format(end - start, '.5f') + " s")20if __name__ == "__main__":21 #generateBatches()22 if len(sys.argv) == 2 and sys.argv[1] == "-generateBatches":23 generateBatches()24 if len(sys.argv) == 2 and sys.argv[1] == "-generateBehaviours":25 generateBehaviour()26 if len(sys.argv) == 2 and sys.argv[1] == "-startHttpServer":27 startHttpServer()28 if len(sys.argv) == 2 and sys.argv[1] == "-resultVerification":29 resultsVerification()30 if len(sys.argv) == 1:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful