Best Python code snippet using fMBT_python
speed_detect_linked.py
Source:speed_detect_linked.py  
1# USAGE2# python speed-detect.py -ff <first File> -nf <No of files to be processed>3# import the necessary packages4import pytesseract5import argparse6import cv27import sys8import os9import time10import datetime11import csv12from paho.mqtt import client as mqtt_client13#__________________________GLOBAL VARIABLES_____________________________________________________________________________________________14broker = '103.10.133.104'   #---- -------------- IP Address of server15port = 8081            #------------------------ port number to access server16topic = "live_data"          #------------------- MQTT topic to be published17client_id = 'python-mqtt_data' #----------------- client-ID of code18username = 'surya'    #------------------------ MQTT username19password = 'Mqtt@itc'  #------------------------ MQTT password20Connect_Status = False #------------------------ Flag which sets "True" when connected21prev_time = 0          #------------------------ valriable to store disconnected time 22#__________________________________________________________________________________________________23#-----------------------FUNCTION TO BEE CALLED WHEN DISCONNECTED------------------------------------------------------------------------24   #--(funtion argumeents : bool (True/False))25   #-- whenever disconnected , store the live data into csv file -----26def Call_On_Disconnect(state):27    if(not state):28        "!!!!!!!!!!!!!!!!!!! need to change the file path with that in your PC !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"29        with open('/home/surya/Downloads/IOT-standalone/client_side/cache.csv', 'a') as f_object:  30            writer_object = csv.writer(f_object)31            writer_object.writerow([client_id, frame_speed["S.No"], frame_speed["Time Stamp"], frame_speed["Speed"]])32            f_object.close()33#----------------------------------------------------------------------------------------------------------------------------------------34#-----------------------FUNCTION TO CONNECT TO MQTT SERVER--------------------------------------------------------------------------------35def connect_mqtt():36    #------on_connect() function to be called when connected------------------37    def on_connect(client, userdata, flags, rc):38        global Connect_Status39        if rc == 0:    #---------------------- --RETURN CODE: 0 means connected, Turn Connect_Status to "True"40            print("Connected to Surya's MQTT!")41            Connect_Status = True42        else:43            print("Failed to connect, return code %d\n", rc)44            Connect_Status = False45            global prev_time        #------------when not connected store the time in prev_time46            prev_time = time.time()47    #--------------------------------------------------------------------------48    #------on_disconnect() function to be called when disconnected ------------49    def on_disconnect(client, userdata, rc):50        print("client disconnected")51        global Connect_Status52        Connect_Status = False53        global prev_time54        prev_time = time.time()55        if rc == 0:56            print("Unexpected MQTT disconnection. Will auto-reconnect")57        else:58            print("Failed to connect, return code %d, auto-reconnecting"%rc)                59    #---------------------------------------------------------------------------60    client = mqtt_client.Client(client_id)61    client.username_pw_set(username, password)62    # declare the functions to be called on connection and disconnection63    client.on_connect = on_connect64    client.on_disconnect = on_disconnect65    client.connect(broker, port)66    return client67#--------------------------------------------------------------------------------------------------------------------------------------68#------------------------ FUNCTIO TO PUBLISH DATA TO SERVER ----------------------------------------------------------------------------69def publish(client, client_ID, Serial_no, Time_stamp, Rand_str):70    #print(msg)71    msg = "{'client_id': '%s', 'serial_no': '%s', 'time': '%s', 'value': '%s'}"%(client_ID, Serial_no, Time_stamp, Rand_str)72    result = client.publish(topic, msg)73    # result: [0, 1]74    status = result[0]75    for count in range(0,10):   #------if failed to publish, retry upto 10 times76        if status == 0:77            print(f"Send `{msg}` to topic `{topic}`")78            break79        else:80            print(f"Failed to send message to topic {topic}")81            result = client.publish(topic, str(msg))82            status = result[0]83#-------------------------------------------------------------------------------------------------------------------------------------84def data_cleanup(data):85    data = data.strip("\n\x0c").replace(",", "")86    data = ''.join([i for i in data if i.isdigit()])87    return data88def data_validation(input, lastValue=0):  # using a default value of 089    # flag to check if the value is acceptable or not90    divisionCheck = False91    rangeCheck = False92    # chec for null value93    if input == "":94        print("Null check activated")95        return lastValue96    # speed should be a multiple of 1097    if int(input) % 10 == 0:98        print("Division by 10 dectedted")99        divisionCheck = True100    else:101        print("NO division by 10")102        divisionCheck = False103    # value should be between 0 and 18000104    if int(input) <= 18000 and int(input) >= 0:105        print("speed within limit")106        rangeCheck = True107    else:108        print("speed out of limit")109        rangeCheck = False110    # returning the correct value111    if divisionCheck and rangeCheck:112        print("input returned")113        return input114    else:115        print("last value returned")116        return lastValue117# construct the argument parser and parse the arguments118ap = argparse.ArgumentParser()119ap.add_argument("-ff", "--firstFile", required=True,120                help="name of first input image to be OCR'd")121ap.add_argument("-nf", "--numOfFiles", required=True,122                help="number of images to be OCR'd")123args = vars(ap.parse_args())124# Initializing variables to track no of files processed, OCR data of a frame and overall speed data stored in a dict125file_count = 0126frame_speed = {}127speed_data = []128lastSpeed = 0129#--------------------------- MAIN FUNCTION WHERE IMAGE PROCESSING RUNS ---------------------------------------------------------------130def main_func():131    global file_count132    global frame_speed133    global lastSpeed134    global speed_data135    start_time = datetime.datetime.now()136    # load the input image and convert it from BGR to RGB channel ordering137    read_filename = int(args["firstFile"][0:16]) + file_count138    filename = "{}.jpg".format(str(read_filename).zfill(16))139    print("[INFO] Reading Image No {}. File Name {}".format(file_count, filename))140    image = cv2.imread(filename)141    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)142    # extract the relevant segment from image carrying the speed data143    frame = image[240:340, 750:940]144    cv2.imshow("frame {}".format(read_filename), frame)145    # frame Pre processing before OCR146    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)147    gray = cv2.GaussianBlur(gray, (3, 3), 0)148    cv2.imshow("gray", gray)149    thresh = cv2.threshold(150        gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]151    cv2.imshow("thresh {}".format(read_filename), thresh)152    # use Tesseract to OCR the image153    options = "--psm 9"154    ocr_text = pytesseract.image_to_string(frame, config=options)155    # Data cleanup and validation156    speed = data_validation(data_cleanup(ocr_text), lastSpeed)157    # saving for next cycle158    lastSpeed = int(speed)159    cv2.waitKey(0)160    # Preparing data extracted from each frame and then saving it to the dictionary161    frame_speed = {162        "S.No": file_count,163        "Speed": int(speed),164        "Time Stamp": datetime.datetime.now()165    }166    speed_data.append(frame_speed)167    file_count += 1168    # end_time = datetime.datetime.now()169    # elapsed_time = end_time - start_time170    # print(elapsed_time.total_seconds())171   #  print(speed_data)172    print(frame_speed)173# field_names = ['S.No', "Speed", "Time Stamp"]174# Writing data from dictionary to csv file all in one go175# with open('speed.csv', 'w') as csvfile:176#     writer = csv.DictWriter(csvfile, fieldnames=field_names)177#     writer.writeheader()178#     writer.writerows(speed_data)179#------------------------------------------------------------------------------------------------------------------------------------180#---------------------- run() function to execute the whole in loop -----------------------------------------------------------------181def run():182    while(1):183        try:184            client = connect_mqtt()185            client.loop_start()186            while file_count < int(args["numOfFiles"]):187                global prev_time188                #----if disconnected for more than 10sec then restart the connection-----------189                if(((time.time()-prev_time)>10)&(not Connect_Status)):190                    prev_time = time.time()191                    client.loop_stop()192                    client = connect_mqtt()193                    client.loop_start()194                main_func()195                publish(client, client_id, frame_speed["S.No"], frame_speed["Time Stamp"], frame_speed["Speed"])196                if(Connect_Status):197                    print("published file_count: %s"%file_count)198                else:199                    print("failed to publish file_count : %s"%file_count)200                Call_On_Disconnect(Connect_Status)201                time.sleep(1)202                print("-------------------------")203        except Exception as e:204            print(e)205            if(file_count < int(args["numOfFiles"])):206                main_func()207            Call_On_Disconnect(False)208        time.sleep(1)209#-----------------------------------------------------------------------------------------------------------------------------------210if __name__ == '__main__':...streaming.py
Source:streaming.py  
...91                    startTime = datetime.now()92                    self.call_on_frame(frame)93            print("Stream client disconnected", conn.getpeername())94            cv2.destroyAllWindows()95            self.call_on_disconnect()96    def call_on_frame(self, frame):97        self.frameNum += 198        self.lastFrame = frame99        for f in self.on_frame_array:100            f(frame)101    def call_on_disconnect(self):102        for f in self.on_disconnect_array:103            f()104    def on_frame(self):105        def decorator(f):106            self.on_frame_array.append(f)107            return f108        return decorator109    def on_disconnect(self):110        def decorator(f):111            self.on_disconnect_array.append(f)112            return f113        return decorator114class AsyncClient:115    def __init__(self, host, port, usePiCam=False):...publisher.py
Source:publisher.py  
1import random2import time3from paho.mqtt import client as mqtt_client4import csv5import string6from datetime import datetime7#__________________________GLOBAL VARIABLES_____________________________________________________________________________________________8broker = '103.10.133.104'   #------------------------ IP Address of server9port = 8081            #------------------------ port number to access server10topic = "live_data"          #------------------------ MQTT topic to be published11client_id = 'python-mqtt_data' #----------------- client-ID of code12username = 'surya'    #------------------------ MQTT username13password = 'Mqtt@itc'  #------------------------ MQTT password14Connect_Status = False #------------------------ Flag which sets "True" when connected15prev_time = 0          #------------------------ valriable to store disconnected time 16#__________________________________________________________________________________________________17#-----------------------FUNCTION TO BEE CALLED WHEN DISCONNECTED------------------------------------------------------------------------18   #--(funtion argumeents : bool (True/False))19   #-- whenever disconnected , store the live data into csv file -----20def Call_On_Disconnect(state):21    if(not state):22        with open('/home/surya/Downloads/IOT-standalone/client_side/cache.csv', 'a') as f_object:23            writer_object = csv.writer(f_object)24            writer_object.writerow([client_id, i, datetime.now(), val])25            f_object.close()26#----------------------------------------------------------------------------------------------------------------------------------------27#-----------------------FUNCTION TO CONNECT TO MQTT SERVER--------------------------------------------------------------------------------28def connect_mqtt():29    #------on_connect() function to be called when connected------------------30    def on_connect(client, userdata, flags, rc):31        global Connect_Status32        if rc == 0:    #---------------------- --RETURN CODE: 0 means connected, Turn Connect_Status to "True"33            print("Connected to Surya's MQTT!")34            Connect_Status = True35        else:36            print("Failed to connect, return code %d\n", rc)37            Connect_Status = False38            global prev_time        #------------when not connected store the time in prev_time39            prev_time = time.time()40    #--------------------------------------------------------------------------41    #------on_disconnect() function to be called when disconnected ------------42    def on_disconnect(client, userdata, rc):43        print("client disconnected")44        global Connect_Status45        Connect_Status = False46        global prev_time47        prev_time = time.time()48        #print(prev_time)49        if rc == 0:50            print("Unexpected MQTT disconnection. Will auto-reconnect")51        else:52            print("Failed to connect, return code %d, auto-reconnecting"%rc)                53    #---------------------------------------------------------------------------54    client = mqtt_client.Client(client_id)55    client.username_pw_set(username, password)56    # declare the functions to be called on connection and disconnection57    client.on_connect = on_connect58    client.on_disconnect = on_disconnect59    client.connect(broker, port)60    return client61#--------------------------------------------------------------------------------------------------------------------------------------62#------------------------ FUNCTIO TO PUBLISH DATA TO SERVER ----------------------------------------------------------------------------63def publish(client, client_ID, Serial_no, Time_stamp, Rand_str):64    #print(msg)65    msg = "{'client_id': '%s', 'serial_no': '%s', 'time': '%s', 'value': '%s'}"%(client_ID, Serial_no, Time_stamp, Rand_str)66    result = client.publish(topic, msg)67    # result: [0, 1]68    status = result[0]69    for count in range(0,10):   #------if failed to publish, retry upto 10 times70        if status == 0:71            print(f"Send `{msg}` to topic `{topic}`")72            break73        else:74            print(f"Failed to send message to topic {topic}")75            result = client.publish(topic, str(msg))76            status = result[0]77#-------------------------------------------------------------------------------------------------------------------------------------78#--------------------------- MAIN FUNCTION WHERE IMAGE PROCESSING RUNS ---------------------------------------------------------------79i1 = 080i = i181def main_func():82    '''83    .84    .85    MAIN FUNCTION WHERE IMAGE PROCESSING CODE RESITES86    .87    .88    .89    '''90    global i91    global i192    global val93    val = ''.join(random.choices(string.ascii_uppercase + string.digits, k = 8)) #---generate random string94    i1 = i95    i+=196#------------------------------------------------------------------------------------------------------------------------------------97#---------------------- run() function to execute the whole in loop -----------------------------------------------------------------98def run():99    while(1):100        try:101            client = connect_mqtt()102            client.loop_start()103            while(1):104                global prev_time105                #----if disconnected for more than 10sec then restart the connection-----------106                if(((time.time()-prev_time)>10)&(not Connect_Status)):107                    prev_time = time.time()108                    client.loop_stop()109                    client = connect_mqtt()110                    client.loop_start()111                main_func()112                publish(client, client_id, i, datetime.now(), val)113                if(Connect_Status):114                    print("published : %s"%i)115                else:116                    print("failed to publish : %s"%i)117                Call_On_Disconnect(Connect_Status)118                time.sleep(1)119                print("-------------------------")120        except Exception as e:121            print(Connect_Status)122            print(e)123            print(i)124            main_func()125            Call_On_Disconnect(False)126        time.sleep(1)127#-----------------------------------------------------------------------------------------------------------------------------------128if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
