How to use recv_from method in autotest

Best Python code snippet using autotest_python

tcp.py

Source:tcp.py Github

copy

Full Screen

1# from socket import *2# tcp_socket=socket(AF_INET,SOCK_STREAM)3# addr=('192.168.1.6',8080)4# tcp_socket.connect(addr)5# data=input("请输入你要发送的内容:")6# tcp_socket.send(data.encode('gbk'))7# recv_from=tcp_socket.recv(1024)8# print(recv_from.decode('gbk'))9# tcp_socket.close()10# from socket import *11# #12# # 创建socket13# tcp_client_socket = socket(AF_INET, SOCK_STREAM)14#15# # 目的信息16# server_addr = ('192.168.1.6', 8080)17# # 链接服务器18# tcp_client_socket.connect(server_addr)19#20# # 提示用户输入数据21# send_data = input("请输入要发送的数据:")22#23# tcp_client_socket.send(send_data.encode("gbk"))24#25# # 接收对方发送过来的数据,最大接收1024个字节26# recvData = tcp_client_socket.recv(1024)27# print('接收到的数据为:', recvData.decode('gbk'))28#29# # 关闭套接字30# tcp_client_socket.close()31#32# from socket import *33#34# # 创建socket35# tcp_server_socket = socket(AF_INET, SOCK_STREAM)36# # 本地信息37# address = ('', 7788)38# # 绑定39# tcp_server_socket.bind(address)40# # 使用socket创建的套接字默认的属性是主动的,使用listen将其变为被动的,这样就可以接收别人的链接了41# tcp_server_socket.listen(128)42#43# # 如果有新的客户端来链接服务器,那么就产生一个新的套接字专门为这个客户端服务44# # client_socket用来为这个客户端服务45# # tcp_server_socket就可以省下来专门等待其他新客户端的链接46# client_socket, clientAddr = tcp_server_socket.accept()47# # 接收对方发送过来的数据48# recv_data = client_socket.recv(1024) # 接收1024个字节49# print('接收到的数据为:', recv_data.decode('gbk'))50# # 发送一些数据到客户端51# client_socket.send("thank you !".encode('gbk'))52# # 关闭为这个客户端服务的套接字,只要关闭了,就意味着为不能再为这个客户端服务了,如果还需要服务,只能再次重新连接53# client_socket.close()54# from socket import *55# tcp_socket=socket(AF_INET,SOCK_STREAM)56# addr=('192.168.1.6',8080)57# tcp_socket.connect(addr)58# data=input('请输入你要发送的内容:')59# tcp_socket.send(data.encode('gbk'))60# recv_from=tcp_socket.recv(1024)61# print(recv_from.decode('gbk'))62# tcp_socket.close()63# while True:64# from socket import *65# tcp = socket(AF_INET, SOCK_STREAM)66# addr = ('', 8080)67# tcp.bind(addr)68# tcp.listen(100)69# tcp2, tcp_ip = tcp.accept()70# data = input('请输入你要向服务端发送的信息:')71# tcp2.send(data.encode('gbk'))72# recv_from = tcp2.recv(2014)73# print('服务器为你的消息为:%s' % recv_from.decode('gbk'))74# tcp2.close()75# tcp.close()76# while True:77# from socket import *78# tcp = socket(AF_INET, SOCK_STREAM)79# addr = ('192.168.1.6', 8080)80# tcp.connect(addr)81# data = input('请输入你要发送的内容:')82# tcp.send(data.encode('gbk'))83# recv_from = tcp.recv(1024)84# print('收到的内容为:%s' % recv_from.decode('gbk'))85# tcp.close()86# from socket import *87# tcp_1=socket(AF_INET,SOCK_STREAM)88# addr=('',8080)89# tcp_1.bind(addr)90# tcp_1.listen(100)91# q,w=tcp_1.accept()92# recv=q.recv(1024)93# print(recv.decode('gbk'))94# q.send('thankyou'.encode('gbk'))95# print(w)96# q.close()97# from socket import *`98# tcp_socket=socket(AF_INET,SOCK_STREAM)99# addr=('192.168.1.1',8080)100# tcp_socket.connect(addr)101# data=input('请输入你要发送的内容:')102# tcp_socket.send(data.encode('gbk'))103# recv_from=tcp_socket.recv(1024)104# print(recv_from.decode('gbk'))105# tcp_socket.close()106# a=[1,2,3,4,5]107#108#109# def cha(a,i):110# first=0111# last=len(a)-1112# while first<=last:113# x=(last+first)//2114# if a[x]==i:115# return True116# elif i>a[x]:117# first=x+1118# else:119# last=x-1120# else:121# return False122# def cha(a,i):123# if len(a)==0:124# return False125# else:126# x=len(a)//2127# if a[x]==i:128# return True129# elif i>a[x]:130# return cha(a[x+1:],i)131# else:132# return cha(a[:x],i)133# print(cha(a,5))134# from socket import *135# tcp=socket(AF_INET,SOCK_STREAM)136# addr=('',8080)137# tcp.bind(addr)138# tcp.listen(100)139# kehu,kehu_ip=tcp.accept()140# recv_data=kehu.recv(1024)141# print('收到客户端的信息为:%s'%recv_data.decode('gbk'))142# kehu.send('thankyou'.encode('gbk'))143# recv_data2=kehu.recv(1024)144# print(recv_data2.decode('gbk'))145# print(kehu_ip)146# kehu.close()147# tcp.close()148# from socket import *149# tcp_server_socket=socket(AF_INET,SOCK_STREAM)150# addr=('',7788)151# tcp_server_socket.bind(addr)152# tcp_server_socket.listen(100)153# tcp_client_socket,client_addr=tcp_server_socket.accept()154# data=input("请输入你要想服务端发送的信息:")155# tcp_client_socket.send(data.encode('gbk'))156# # data2=input('请输入你要回复客户端的信息:')157# rec=tcp_client_socket.recv(2014)158# print(rec.decode('gbk'))159# tcp_client_socket.close()160# tcp_server_socket.close()161# from socket import *162# tcp_server_socket=socket(AF_INET,SOCK_STREAM)163# addr=('',8080)164# tcp_server_socket.bind(addr)165# tcp_server_socket.listen(100)166# tcp_client_socket,tcp_client_ip=tcp_server_socket.accept()167# recv_from=tcp_client_socket.recv(1024)168# print(recv_from.decode('gbk'))169# print(tcp_client_ip)170# tcp_client_socket.close()171# tcp_server_socket.close()172# from socket import *173# tcp_socket=socket(AF_INET,SOCK_STREAM)174# addr=('192.168.1.6',8080)175# tcp_socket.connect(addr)176# tcp_socket.send('123'.encode('gbk'))177# rev=tcp_socket.recv(1024)178# print(rev.decode('gbk'))179# tcp_socket.close()180import urllib.request as a181response=a.urlopen('https://www.baidu.com')182headers=response.getheaders()183for x,y in headers:184 print(x,y)185html=response.read()...

Full Screen

Full Screen

cankao2.py

Source:cankao2.py Github

copy

Full Screen

1import socket,sys, select, time, heapq23def encode(node_name,node_graph):4 message = node_name + "\n"5 for i in node_graph:6 message += i + " " + str(node_graph[i]) + "\n"7 return message.encode("UTF-8")89def decode(message):10 msg_list = (message.decode("UTF-8")).split("\n")11 graph = {}12 node_name = msg_list[0]13 for lines in msg_list[1:]:14 data = lines.split(" ")15 if len(data) == 2:16 graph[data[0]] = float(data[1])17 return node_name, graph1819'''20the recv_from is a dictionary.21 key is name of the node whose neighbour graph is send as message22 value is the name of the node which send that message.23 { neighbour graph of A : receive from {B, C, D}}24'''25def update_graph():26 global recv_from27 global graph28 inf, outf, errf = select.select([sock, ], [], [], 0)29 global keep_alive30 if inf:31 for sockets in inf:32 message, ADDR = sockets.recvfrom(1024)33 node, recv_graph = decode(message)34 keep_alive[ADDR[1]] = time.time()35 graph[node] = recv_graph36 if node not in recv_from:37 recv_from[node] = [ADDR[1]]38 elif ADDR[1] not in recv_from[node]:39 recv_from[node].append(ADDR[1])404142def broadcast(node):43 for neighbour_node in node_port: #对于所有的相邻节点,44 port = node_port[neighbour_node] # 对此相邻的节点发送4546 if node not in recv_from or port not in recv_from[node]:#如果接受的图对应的节点B不在 recv_from中 对应的节点B 的记录里47 sock.sendto(encode(node, graph[node]), ("localhost", port))48def Dijkstra_Algrm(graph):49 result_set = {}50 set_N = []51 from_to_list = {}5253 self_graph = graph[node_name]54 failure_node = []55 failure_flag = True56 #find the router which is disconnected.57 for node in graph:58 for i in graph[node]:59 if graph[node][i] != float("inf"):60 failure_flag = False61 break62 if failure_flag and node not in failure_node:63 failure_node.append(node)64 failure_flag = True6566 for node in graph:67 if node_name != node and node not in failure_node:68 if node in self_graph:69 from_to_list[node] = node_name70 result_set[node] = graph[node_name][node]71 else:72 result_set[node] = float("inf")7374 sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])7576 while len(set_N) < len(sorted_result_set):77 break_flag = False78 for node in sorted_result_set:79 if node[0] not in set_N:80 set_N.append(node[0])81 node_distance = node[1]82 for new_node in graph[node[0]]:83 if new_node == node_name or new_node in failure_node:84 continue85 if graph[node[0]][new_node] + node_distance < result_set[new_node]:86 from_to_list[new_node] = node[0]87 result_set[new_node] = graph[node[0]][new_node] + node_distance88 break_flag = True89 if break_flag:90 break91 sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])92 return result_set, from_to_list, failure_node9394def find_shortest_path(result_set, from_to_list, failure_node):95 #result set A {'F': 4, 'C': 3, 'D': 1, 'E': 2, 'B': 2}96 #from_to_list {'F': 'E', 'C': 'E', 'D': 'A', 'E': 'D', 'B': 'A'}97 least_cost = {}98 for node in result_set:99 if node in failure_node:100 continue101 least_cost[node] = [node]102 next_node = from_to_list[node]103 while next_node != node_name:104 least_cost[node].append(next_node)105 next_node = from_to_list[next_node]106 least_cost[node].append(next_node)107 least_cost[node].reverse()108 for node in sorted(least_cost.keys()):109 path = ""110 for i in least_cost[node]:111 path += str(i)112 print("least-cost path to node %s: %s and the cost is %.1f"%(node, path, result_set[node]))113 return least_cost114115if __name__ == "__main__":116 argv = sys.argv[1:]117 node_name = argv[0]118 port_number = int(argv[1])119 config_file = open(argv[2])120121 address = ('localhost', port_number)122 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)123 sock.bind(address)124 graph = {}125 node_port = {}126 keep_alive ={}127128 # data: node_name port_number distance129 for lines in config_file:130 data = lines.split(" ")131 if not node_name in graph:132 graph[node_name] = {}133 if len(data) == 3:134 graph[node_name][data[0]] = float(data[1])135 node_port[data[0]] = int(data[2])136 keep_alive[int(data[2])] = time.time() + 3137 recv_from = {node_name:[port_number]}138 start_time = time.time()139 now_time = time.time() - 1 #make sure it will start immediately at beginning140 delete_list = []141 while 1:142 while(now_time <= start_time + 28):143 if node_name == 'E' and time.time() > start_time + 15:144 exit("terminal router E")145 if node_name == 'B' and time.time() > start_time + 27:146 exit("terminal router B")147 if time.time() >= now_time + 1:148 for node in graph:149 broadcast(node)150 now_time = time.time()151 update_graph()152 #dealing with node failures153 for port in node_port:154 try:155 if now_time - keep_alive[node_port[port]] > 3:156 graph[node_name][port] = float('inf')157 for i in graph[port]:158 graph[port][i] = float('inf')159 del keep_alive[node_port[port]]160 except KeyError:161 pass162 result_set, from_to_list, failure_node = Dijkstra_Algrm(graph)163 least_cost_path = find_shortest_path(result_set, from_to_list, failure_node)164 now_time = time.time() ...

Full Screen

Full Screen

BluetoothMgmt.py

Source:BluetoothMgmt.py Github

copy

Full Screen

1import socket2import threading3import time4import multiprocessing56from env import *789class BluetoothMgmt(multiprocessing.Process):10 handle_q = multiprocessing.Manager().Queue()1112 def __init__(self, job_q, header):13 multiprocessing.Process.__init__(self)1415 self.header = header16 self.client = None17 self.job_q = job_q18 self.daemon = True1920 self.start()2122 def recv(self):23 while True:24 try:25 data = self.client.recv(ANDROID_BUFFER_SIZE)26 if not len(data):27 continue28 packet = data.decode('utf-8').rstrip()2930 print("[LOG][AND]", f"Received from Android: {packet}")31 self.job_q.put(f'{self.header}:{packet}\n')3233 except Exception as e:34 print("[ERR][AND]:",35 "Failed to receive data. Bluetooth Disconnected")36 break3738 time.sleep(0.00001)3940 self.close_connection()4142 def send(self, message):43 if(not self.client):44 print("[ERR][AND]:",45 f"Trying to send packet {message} but no clients connected!")46 else:47 try:48 self.client.send(message)49 time.sleep(0.6)5051 # will remove the try except, this one is needed because Nexus app and Bluetooth Terminal send in different format52 except:53 self.client.send((message).encode('utf-8'))54 time.sleep(0.6)5556 print("[LOG][AND]:",57 f'Sending "{message}" to Android successful')5859 def close_connection(self):60 self.client.close()61 self.client = None62 print("[LOG][AND]", "Android Bluetooth Connection closed")6364 def getPacketHeader(self):65 return self.header6667 def handleProcessor(self):68 while True:69 if(self.handle_q.qsize() != 0):70 packet, recv_from = self.handle_q.get()71 packet = packet.rstrip()72 self.send(f'{recv_from}:{packet}')73 self.handle_q.task_done()74 print("[LOG][AND]",75 f'Done Handling Packet from "{recv_from}" ,content: "{packet}"')76 time.sleep(0.000001)7778 def handle(self, packet, recv_from):79 self.handle_q.put((packet, recv_from))8081 def run(self):82 q_thread = threading.Thread(target=self.handleProcessor, args=())83 q_thread.start()8485 server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,86 socket.BTPROTO_RFCOMM)87 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)88 server_sock.bind((ANDROID_HOST_MAC, ANDROID_BLUETOOTH_PORT))89 server_sock.listen(ANDROID_BLUETOOTH_BACKLOG)9091 while True:92 print("[LOG][AND]", "Listening for connection")93 self.client, address = server_sock.accept()94 print("[LOG][AND]", "Connection from: "+str(address))9596 receive_thread = threading.Thread(target=self.recv, args=())97 receive_thread.start()98 receive_thread.join()99100 self.close_connection()101 server_sock.close() ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful