Best Python code snippet using autotest_python
tcp.py
Source:tcp.py  
1# from socket import *2# tcp_socket=socket(AF_INET,SOCK_STREAM)3# addr=('192.168.1.6',8080)4# tcp_socket.connect(addr)5# data=input("请è¾å
¥ä½ è¦åéçå
容ï¼")6# tcp_socket.send(data.encode('gbk'))7# recv_from=tcp_socket.recv(1024)8# print(recv_from.decode('gbk'))9# tcp_socket.close()10# from socket import *11# #12# # å建socket13# tcp_client_socket = socket(AF_INET, SOCK_STREAM)14#15# # ç®çä¿¡æ¯16# server_addr = ('192.168.1.6', 8080)17# # 龿¥æå¡å¨18# tcp_client_socket.connect(server_addr)19#20# # æç¤ºç¨æ·è¾å
¥æ°æ®21# send_data = input("请è¾å
¥è¦åéçæ°æ®ï¼")22#23# tcp_client_socket.send(send_data.encode("gbk"))24#25# # æ¥æ¶å¯¹æ¹åéè¿æ¥çæ°æ®ï¼æå¤§æ¥æ¶1024个åè26# recvData = tcp_client_socket.recv(1024)27# print('æ¥æ¶å°çæ°æ®ä¸º:', recvData.decode('gbk'))28#29# # å
³é奿¥å30# tcp_client_socket.close()31#32# from socket import *33#34# # å建socket35# tcp_server_socket = socket(AF_INET, SOCK_STREAM)36# # æ¬å°ä¿¡æ¯37# address = ('', 7788)38# # ç»å®39# tcp_server_socket.bind(address)40# # 使ç¨socketå建ç奿¥åé»è®¤ç屿§æ¯ä¸»å¨çï¼ä½¿ç¨listenå°å
¶å为被å¨çï¼è¿æ ·å°±å¯ä»¥æ¥æ¶å«äººç龿¥äº41# tcp_server_socket.listen(128)42#43# # å¦æææ°ç客æ·ç«¯æ¥é¾æ¥æå¡å¨ï¼é£ä¹å°±äº§çä¸ä¸ªæ°ç奿¥åä¸é¨ä¸ºè¿ä¸ªå®¢æ·ç«¯æå¡44# # client_socketç¨æ¥ä¸ºè¿ä¸ªå®¢æ·ç«¯æå¡45# # tcp_server_socketå°±å¯ä»¥ç䏿¥ä¸é¨çå¾
å
¶ä»æ°å®¢æ·ç«¯ç龿¥46# client_socket, clientAddr = tcp_server_socket.accept()47# # æ¥æ¶å¯¹æ¹åéè¿æ¥çæ°æ®48# recv_data = client_socket.recv(1024)  # æ¥æ¶1024个åè49# print('æ¥æ¶å°çæ°æ®ä¸º:', recv_data.decode('gbk'))50# # åéä¸äºæ°æ®å°å®¢æ·ç«¯51# client_socket.send("thank you !".encode('gbk'))52# # å
³é为è¿ä¸ªå®¢æ·ç«¯æå¡ç奿¥åï¼åªè¦å
³éäºï¼å°±æå³ç为ä¸è½å为è¿ä¸ªå®¢æ·ç«¯æå¡äºï¼å¦æè¿éè¦æå¡ï¼åªè½åæ¬¡éæ°è¿æ¥53# client_socket.close()54# from socket import *55# tcp_socket=socket(AF_INET,SOCK_STREAM)56# addr=('192.168.1.6',8080)57# tcp_socket.connect(addr)58# data=input('请è¾å
¥ä½ è¦åéçå
容ï¼')59# tcp_socket.send(data.encode('gbk'))60# recv_from=tcp_socket.recv(1024)61# print(recv_from.decode('gbk'))62# tcp_socket.close()63# while True:64#     from socket import *65#     tcp = socket(AF_INET, SOCK_STREAM)66#     addr = ('', 8080)67#     tcp.bind(addr)68#     tcp.listen(100)69#     tcp2, tcp_ip = tcp.accept()70#     data = input('请è¾å
¥ä½ è¦åæå¡ç«¯åéçä¿¡æ¯ï¼')71#     tcp2.send(data.encode('gbk'))72#     recv_from = tcp2.recv(2014)73#     print('æå¡å¨ä¸ºä½ çæ¶æ¯ä¸ºï¼%s' % recv_from.decode('gbk'))74#     tcp2.close()75#     tcp.close()76# while True:77#     from socket import *78#     tcp = socket(AF_INET, SOCK_STREAM)79#     addr = ('192.168.1.6', 8080)80#     tcp.connect(addr)81#     data = input('请è¾å
¥ä½ è¦åéçå
容ï¼')82#     tcp.send(data.encode('gbk'))83#     recv_from = tcp.recv(1024)84#     print('æ¶å°çå
容为ï¼%s' % recv_from.decode('gbk'))85#     tcp.close()86# from socket import *87# tcp_1=socket(AF_INET,SOCK_STREAM)88# addr=('',8080)89# tcp_1.bind(addr)90# tcp_1.listen(100)91# q,w=tcp_1.accept()92# recv=q.recv(1024)93# print(recv.decode('gbk'))94# q.send('thankyou'.encode('gbk'))95# print(w)96# q.close()97# from socket import *`98# tcp_socket=socket(AF_INET,SOCK_STREAM)99# addr=('192.168.1.1',8080)100# tcp_socket.connect(addr)101# data=input('请è¾å
¥ä½ è¦åéçå
容ï¼')102# tcp_socket.send(data.encode('gbk'))103# recv_from=tcp_socket.recv(1024)104# print(recv_from.decode('gbk'))105# tcp_socket.close()106# a=[1,2,3,4,5]107#108#109# def cha(a,i):110#     first=0111#     last=len(a)-1112#     while first<=last:113#         x=(last+first)//2114#         if a[x]==i:115#             return True116#         elif i>a[x]:117#             first=x+1118#         else:119#             last=x-1120#     else:121#         return False122# def cha(a,i):123#     if len(a)==0:124#         return False125#     else:126#         x=len(a)//2127#         if a[x]==i:128#             return True129#         elif i>a[x]:130#             return cha(a[x+1:],i)131#         else:132#             return cha(a[:x],i)133# print(cha(a,5))134# from socket import *135# tcp=socket(AF_INET,SOCK_STREAM)136# addr=('',8080)137# tcp.bind(addr)138# tcp.listen(100)139# kehu,kehu_ip=tcp.accept()140# recv_data=kehu.recv(1024)141# print('æ¶å°å®¢æ·ç«¯çä¿¡æ¯ä¸ºï¼%s'%recv_data.decode('gbk'))142# kehu.send('thankyou'.encode('gbk'))143# recv_data2=kehu.recv(1024)144# print(recv_data2.decode('gbk'))145# print(kehu_ip)146# kehu.close()147# tcp.close()148# from socket import *149# tcp_server_socket=socket(AF_INET,SOCK_STREAM)150# addr=('',7788)151# tcp_server_socket.bind(addr)152# tcp_server_socket.listen(100)153# tcp_client_socket,client_addr=tcp_server_socket.accept()154# data=input("请è¾å
¥ä½ è¦æ³æå¡ç«¯åéçä¿¡æ¯ï¼")155# tcp_client_socket.send(data.encode('gbk'))156# # data2=input('请è¾å
¥ä½ è¦åå¤å®¢æ·ç«¯çä¿¡æ¯ï¼')157# rec=tcp_client_socket.recv(2014)158# print(rec.decode('gbk'))159# tcp_client_socket.close()160# tcp_server_socket.close()161# from socket import *162# tcp_server_socket=socket(AF_INET,SOCK_STREAM)163# addr=('',8080)164# tcp_server_socket.bind(addr)165# tcp_server_socket.listen(100)166# tcp_client_socket,tcp_client_ip=tcp_server_socket.accept()167# recv_from=tcp_client_socket.recv(1024)168# print(recv_from.decode('gbk'))169# print(tcp_client_ip)170# tcp_client_socket.close()171# tcp_server_socket.close()172# from socket import *173# tcp_socket=socket(AF_INET,SOCK_STREAM)174# addr=('192.168.1.6',8080)175# tcp_socket.connect(addr)176# tcp_socket.send('123'.encode('gbk'))177# rev=tcp_socket.recv(1024)178# print(rev.decode('gbk'))179# tcp_socket.close()180import urllib.request as a181response=a.urlopen('https://www.baidu.com')182headers=response.getheaders()183for x,y in headers:184    print(x,y)185html=response.read()...cankao2.py
Source:cankao2.py  
1import socket,sys, select, time, heapq23def encode(node_name,node_graph):4    message = node_name + "\n"5    for i in node_graph:6        message += i + " " + str(node_graph[i]) + "\n"7    return message.encode("UTF-8")89def decode(message):10    msg_list = (message.decode("UTF-8")).split("\n")11    graph = {}12    node_name = msg_list[0]13    for lines in msg_list[1:]:14        data = lines.split(" ")15        if len(data) == 2:16            graph[data[0]] = float(data[1])17    return node_name, graph1819'''20the recv_from is a dictionary.21 key is name of the node whose neighbour graph is send as message22 value is the name of the node which send that message.23 { neighbour graph of A : receive from {B, C, D}}24'''25def update_graph():26    global recv_from27    global graph28    inf, outf, errf = select.select([sock, ], [], [], 0)29    global keep_alive30    if inf:31        for sockets in inf:32            message, ADDR = sockets.recvfrom(1024)33            node, recv_graph = decode(message)34            keep_alive[ADDR[1]] = time.time()35            graph[node] = recv_graph36            if node not in recv_from:37                recv_from[node] = [ADDR[1]]38            elif ADDR[1] not in recv_from[node]:39                recv_from[node].append(ADDR[1])404142def broadcast(node):43    for neighbour_node in node_port:    #å¯¹äºææçç¸é»èç¹ï¼44        port = node_port[neighbour_node]  # 对æ¤ç¸é»çèç¹åé4546        if node not in recv_from or port not in recv_from[node]:#妿æ¥åçå¾å¯¹åºçèç¹Bä¸å¨  recv_fromä¸ å¯¹åºçèç¹B çè®°å½é47            sock.sendto(encode(node, graph[node]), ("localhost", port))48def Dijkstra_Algrm(graph):49    result_set = {}50    set_N = []51    from_to_list = {}5253    self_graph = graph[node_name]54    failure_node = []55    failure_flag = True56    #find the router which is disconnected.57    for node in graph:58        for i in graph[node]:59            if graph[node][i] != float("inf"):60                failure_flag = False61                break62        if failure_flag and node not in failure_node:63            failure_node.append(node)64        failure_flag = True6566    for node in graph:67        if node_name != node and node not in failure_node:68            if node in self_graph:69                from_to_list[node] = node_name70                result_set[node] = graph[node_name][node]71            else:72                result_set[node] = float("inf")7374    sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])7576    while len(set_N) < len(sorted_result_set):77        break_flag = False78        for node in sorted_result_set:79            if node[0] not in set_N:80                set_N.append(node[0])81                node_distance = node[1]82                for new_node in graph[node[0]]:83                    if new_node == node_name or new_node in failure_node:84                        continue85                    if graph[node[0]][new_node] + node_distance < result_set[new_node]:86                        from_to_list[new_node] = node[0]87                        result_set[new_node] = graph[node[0]][new_node] + node_distance88                        break_flag = True89                if break_flag:90                    break91        sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])92    return result_set, from_to_list, failure_node9394def find_shortest_path(result_set, from_to_list, failure_node):95    #result set A {'F': 4, 'C': 3, 'D': 1, 'E': 2, 'B': 2}96    #from_to_list {'F': 'E', 'C': 'E', 'D': 'A', 'E': 'D', 'B': 'A'}97    least_cost = {}98    for node in result_set:99        if node in failure_node:100            continue101        least_cost[node] = [node]102        next_node = from_to_list[node]103        while next_node != node_name:104            least_cost[node].append(next_node)105            next_node = from_to_list[next_node]106        least_cost[node].append(next_node)107        least_cost[node].reverse()108    for node in sorted(least_cost.keys()):109        path = ""110        for i in least_cost[node]:111            path += str(i)112        print("least-cost path to node %s: %s and the cost is %.1f"%(node, path, result_set[node]))113    return least_cost114115if __name__ == "__main__":116    argv = sys.argv[1:]117    node_name = argv[0]118    port_number = int(argv[1])119    config_file = open(argv[2])120121    address = ('localhost', port_number)122    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)123    sock.bind(address)124    graph = {}125    node_port = {}126    keep_alive ={}127128    # data: node_name port_number distance129    for lines in config_file:130        data = lines.split(" ")131        if not node_name in graph:132            graph[node_name] = {}133        if len(data) == 3:134            graph[node_name][data[0]] = float(data[1])135            node_port[data[0]] = int(data[2])136            keep_alive[int(data[2])] = time.time() + 3137    recv_from = {node_name:[port_number]}138    start_time = time.time()139    now_time = time.time() - 1 #make sure it will start immediately at beginning140    delete_list = []141    while 1:142        while(now_time <= start_time + 28):143            if node_name == 'E' and time.time() > start_time + 15:144                exit("terminal router E")145            if node_name == 'B' and time.time() > start_time + 27:146                exit("terminal router B")147            if time.time() >= now_time + 1:148                for node in graph:149                    broadcast(node)150                now_time = time.time()151            update_graph()152                #dealing with node failures153            for port in node_port:154                try:155                    if now_time - keep_alive[node_port[port]] > 3:156                        graph[node_name][port] = float('inf')157                        for i in graph[port]:158                            graph[port][i] = float('inf')159                        del keep_alive[node_port[port]]160                except KeyError:161                    pass162        result_set, from_to_list, failure_node = Dijkstra_Algrm(graph)163        least_cost_path = find_shortest_path(result_set, from_to_list, failure_node)164        now_time = time.time()
...BluetoothMgmt.py
Source:BluetoothMgmt.py  
1import socket2import threading3import time4import multiprocessing56from env import *789class BluetoothMgmt(multiprocessing.Process):10    handle_q = multiprocessing.Manager().Queue()1112    def __init__(self, job_q, header):13        multiprocessing.Process.__init__(self)1415        self.header = header16        self.client = None17        self.job_q = job_q18        self.daemon = True1920        self.start()2122    def recv(self):23        while True:24            try:25                data = self.client.recv(ANDROID_BUFFER_SIZE)26                if not len(data):27                    continue28                packet = data.decode('utf-8').rstrip()2930                print("[LOG][AND]", f"Received from Android: {packet}")31                self.job_q.put(f'{self.header}:{packet}\n')3233            except Exception as e:34                print("[ERR][AND]:",35                      "Failed to receive data. Bluetooth Disconnected")36                break3738            time.sleep(0.00001)3940        self.close_connection()4142    def send(self, message):43        if(not self.client):44            print("[ERR][AND]:",45                  f"Trying to send packet {message} but no clients connected!")46        else:47            try:48                self.client.send(message)49                time.sleep(0.6)5051            # will remove the try except, this one is needed because Nexus app and Bluetooth Terminal send in different format52            except:53                self.client.send((message).encode('utf-8'))54                time.sleep(0.6)5556            print("[LOG][AND]:",57                  f'Sending "{message}" to Android successful')5859    def close_connection(self):60        self.client.close()61        self.client = None62        print("[LOG][AND]", "Android Bluetooth Connection closed")6364    def getPacketHeader(self):65        return self.header6667    def handleProcessor(self):68        while True:69            if(self.handle_q.qsize() != 0):70                packet, recv_from = self.handle_q.get()71                packet = packet.rstrip()72                self.send(f'{recv_from}:{packet}')73                self.handle_q.task_done()74                print("[LOG][AND]",75                      f'Done Handling Packet from "{recv_from}" ,content: "{packet}"')76            time.sleep(0.000001)7778    def handle(self, packet, recv_from):79        self.handle_q.put((packet, recv_from))8081    def run(self):82        q_thread = threading.Thread(target=self.handleProcessor, args=())83        q_thread.start()8485        server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,86                                    socket.BTPROTO_RFCOMM)87        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)88        server_sock.bind((ANDROID_HOST_MAC, ANDROID_BLUETOOTH_PORT))89        server_sock.listen(ANDROID_BLUETOOTH_BACKLOG)9091        while True:92            print("[LOG][AND]", "Listening for connection")93            self.client, address = server_sock.accept()94            print("[LOG][AND]", "Connection from: "+str(address))9596            receive_thread = threading.Thread(target=self.recv, args=())97            receive_thread.start()98            receive_thread.join()99100        self.close_connection()101        server_sock.close()
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
