How to use receive_msg method in localstack

Best Python code snippet using localstack_python

abb_client.py

Source:abb_client.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""Python client interface for ABB RAPID server.3Adapted from Open ABB python client (https://github.com/robotics/open_abb)4"""5import time6import socket7from struct import pack, unpack_from, calcsize8import numpy as np9class ABBClient:10 """Python client interface for ABB RAPID server.11 """12 class CommandFailed(RuntimeError):13 pass14 class InvalidZone(ValueError):15 pass16 17 SERVER_ERROR = 018 SERVER_OK = 119 20 def __init__(self, ip='192.168.125.1', port=5000):21 self._delay = .0822 self.set_units('millimeters', 'degrees')23 self.connect((ip, port))24 def __repr__(self): 25 return "{} ({})".format(self.__class__.__name__, self.get_info())26 27 def __str__(self):28 return self.__repr__()29 30 def __enter__(self):31 return self32 33 def __exit__(self, exc_type, exc_value, traceback):34 self.close()35 def set_units(self, linear, angular):36 """Sets linear and angular units.37 """38 units_l = {'millimeters' : 1.0,39 'meters' : 1000.0,40 'inches' : 25.4,41 }42 units_a = {'degrees' : 1.0,43 'radians' : 57.2957795,44 }45 self._scale_linear = units_l[linear]46 self._scale_angle = units_a[angular]47 48 def connect(self, remote):49 """Connects to server.50 """51 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)52 self.sock.settimeout(2.5)53 self.sock.connect(remote)54 self.sock.settimeout(None) 55 print("Client connected ...")56 def get_info(self):57 """Returns a unique robot identifier string.58 """59 command = 060 send_msg = pack('>H', command)61 self.sock.send(send_msg)62 time.sleep(self._delay)63 receive_msg = self.sock.recv(4096)64 retvals = unpack_from('>H', receive_msg)65 ack = retvals[0]66 if ack != ABBClient.SERVER_OK:67 raise ABBClient.CommandFailed 68 info = receive_msg[calcsize('>H'):].decode()69 return info 70 def move_joints(self, joint_angles):71 """Executes an immediate move to the specified joint angles.72 73 joint_angles = (j0, j1, j2, j3, j4, j5)74 j0, j1, j2, j3, j4, j5 are numbered from base to end effector and are75 measured in degrees (default)76 """77 joint_angles = np.array(joint_angles, dtype=np.float32).ravel()78 joint_angles *= self._scale_angle79 command = 180 send_msg = pack('>Hffffff', command, *joint_angles)81 self.sock.send(send_msg)82 time.sleep(self._delay)83 receive_msg = self.sock.recv(4096)84 retvals = unpack_from('>H', receive_msg)85 ack = retvals[0]86 if ack != ABBClient.SERVER_OK:87 raise ABBClient.CommandFailed 88 def move_linear(self, pose):89 """Executes a linear/cartesian move from the current base frame pose to90 the specified pose.91 92 pose = (x, y, z, qw, qx, qy, qz)93 x, y, z specify a Cartesian position (default mm)94 qw, qx, qy, qz specify a quaternion rotation95 """96 pose = np.array(pose, dtype=np.float32).ravel()97 pose[:3] *= self._scale_linear98 99 command = 2100 send_msg = pack('>Hfffffff', command, *pose)101 self.sock.send(send_msg)102 time.sleep(self._delay)103 receive_msg = self.sock.recv(4096)104 retvals = unpack_from('>H', receive_msg)105 ack = retvals[0]106 if ack != ABBClient.SERVER_OK:107 raise ABBClient.CommandFailed 108 def move_circular(self, via_pose, end_pose):109 """Executes a movement in a circular path from the current base frame110 pose, through via_pose, to end_pose.111 112 via_pose, end_pose = (x, y, z, qw, qx, qy, qz)113 x, y, z specify a Cartesian position (default mm)114 qw, qx, qy, qz specify a quaternion rotation115 """ 116 via_pose = np.array(via_pose, dtype=np.float32).ravel()117 via_pose[:3] *= self._scale_linear118 end_pose = np.array(end_pose, dtype=np.float32).ravel()119 end_pose[:3] *= self._scale_linear 120 121 command = 3122 send_msg = pack('>Hffffffffffffff', command, *via_pose, *end_pose)123 self.sock.send(send_msg)124 time.sleep(self._delay)125 receive_msg = self.sock.recv(4096)126 retvals = unpack_from('>H', receive_msg)127 ack = retvals[0]128 if ack != ABBClient.SERVER_OK:129 raise ABBClient.CommandFailed 130 def set_tcp(self, tcp):131 """Sets the tool center point (TCP) of the robot.132 133 The TCP is specified in the output flange frame, which is located at134 the intersection of the tool flange center axis and the flange face,135 with the z-axis aligned with the tool flange center axis.136 137 tcp = (x, y, z, qw, qx, qy, qz)138 x, y, z specify a Cartesian position (default mm)139 qw, qx, qy, qz specify a quaternion rotation140 """141 tcp = np.array(tcp, dtype=np.float32).ravel()142 tcp[:3] *= self._scale_linear143 144 command = 4145 send_msg = pack('>Hfffffff', command, *tcp)146 self.sock.send(send_msg)147 time.sleep(self._delay)148 receive_msg = self.sock.recv(4096)149 retvals = unpack_from('>H', receive_msg)150 ack = retvals[0]151 if ack != ABBClient.SERVER_OK:152 raise ABBClient.CommandFailed 153 def set_work_object(self, work_object):154 """Sets the work object on the robot.155 156 The work object is a local coordinate frame on the robot, where157 subsequent linear moves will be in this coordinate frame. 158 159 work_object = (x, y, z, qw, qx, qy, qz)160 x, y, z specify a Cartesian position (default mm)161 qw, qx, qy, qz specify a quaternion rotation162 """163 work_object = np.array(work_object, dtype=np.float32).ravel()164 work_object[:3] *= self._scale_linear165 166 command = 5167 send_msg = pack('>Hfffffff', command, *work_object)168 self.sock.send(send_msg)169 time.sleep(self._delay)170 receive_msg = self.sock.recv(4096)171 retvals = unpack_from('>H', receive_msg)172 ack = retvals[0]173 if ack != ABBClient.SERVER_OK:174 raise ABBClient.CommandFailed 175 def set_speed(self, linear_speed, angular_speed):176 """Sets the linear speed (default mm/s) and angular speed177 (default deg/s) of the robot TCP.178 """179 linear_speed *= self._scale_linear180 angular_speed *= self._scale_angle181 command = 6182 send_msg = pack('>Hff', command, linear_speed, angular_speed)183 self.sock.send(send_msg)184 time.sleep(self._delay)185 receive_msg = self.sock.recv(4096)186 retvals = unpack_from('>H', receive_msg)187 ack = retvals[0]188 if ack != ABBClient.SERVER_OK:189 raise ABBClient.CommandFailed 190 def set_zone(self, 191 zone_key = 'z0', 192 point_motion = False, 193 manual_zone = None):194 zone_dict = {'z0': (0.3, 0.3, 0.03), 195 'z1': (1, 1, 0.1), 196 'z5': (5, 8, 0.8), 197 'z10': (10, 15, 1.5), 198 'z15': (15, 23, 2.3), 199 'z20': (20, 30, 3), 200 'z30': (30, 45, 4.5), 201 'z50': (50, 75, 7.5), 202 'z100': (100, 150, 15), 203 'z200': (200, 300, 30),204 }205 """Sets the motion zone of the robot. This can also be thought of as206 the flyby zone, AKA if the robot is going from point A -> B -> C,207 how close do we have to pass by B to get to C208 209 zone_key: uses values from RAPID handbook (stored here in zone_dict)210 with keys 'z*', you should probably use these211 point_motion: go to point exactly, and stop briefly before moving on212 manual_zone = [pzone_tcp, pzone_ori, zone_ori]213 pzone_tcp: default mm, radius from goal where robot tool centerpoint 214 is not rigidly constrained215 pzone_ori: default mm, radius from goal where robot tool orientation 216 is not rigidly constrained217 zone_ori: default degrees, zone size for the tool reorientation218 """219 if point_motion: 220 zone = np.array((0, 0, 0))221 elif manual_zone is not None and len(manual_zone) == 3:222 zone = np.array(manual_zone, dtype=np.float32).ravel()223 elif zone_key in zone_dict.keys(): 224 zone = np.array(zone_dict[zone_key])225 else:226 raise ABBClient.InvalidZone227 228 zone[0] *= self._scale_linear229 zone[1] *= self._scale_linear230 zone[2] *= self._scale_angle231 command = 7232 send_msg = pack('>HHfff', command, int(point_motion), *zone)233 self.sock.send(send_msg)234 time.sleep(self._delay)235 receive_msg = self.sock.recv(4096)236 retvals = unpack_from('>H', receive_msg)237 ack = retvals[0]238 if ack != ABBClient.SERVER_OK:239 raise ABBClient.CommandFailed 240 def get_joint_angles(self):241 """retvalsurns the robot joint angles.242 243 joint_angles = (j0, j1, j2, j3, j4, j5)244 j0, j1, j2, j3, j4, j5 are numbered from base to end effector and are245 measured in degrees (default)246 """ 247 command = 8248 send_msg = pack('>H', command)249 self.sock.send(send_msg)250 time.sleep(self._delay)251 receive_msg = self.sock.recv(4096)252 retvals = unpack_from('>Hffffff', receive_msg)253 ack = retvals[0]254 if ack != ABBClient.SERVER_OK:255 raise ABBClient.CommandFailed 256 joint_angles = np.array(retvals[1:], dtype=np.float64)257 joint_angles /= self._scale_angle258 return joint_angles259 260 def get_pose(self):261 """retvalsurns the TCP pose in the reference coordinate frame.262 263 pose = (x, y, z, qw, qx, qy, qz)264 x, y, z specify a Cartesian position (default mm)265 qw, qx, qy, qz specify a quaternion rotation266 """267 command = 9268 send_msg = pack('>H', command)269 self.sock.send(send_msg)270 time.sleep(self._delay)271 receive_msg = self.sock.recv(4096)272 retvals = unpack_from('>Hfffffff', receive_msg)273 ack = retvals[0]274 if ack != ABBClient.SERVER_OK:275 raise ABBClient.CommandFailed 276 pose = np.array(retvals[1:], dtype=np.float64)277 pose[:3] /= self._scale_linear 278 return pose279 280 def close(self):281 """Releases any resources held by the controller (e.g., sockets).282 """283 command = 99284 send_msg = pack('>H', command)285 self.sock.send(send_msg)286 time.sleep(self._delay) 287 self.sock.shutdown(socket.SHUT_RDWR)288 self.sock.close()...

Full Screen

Full Screen

calculatorServer.py

Source:calculatorServer.py Github

copy

Full Screen

1import multiprocessing as mp2import socket3import json4import sys5import os6import math7import cmath89SERVER = "192.168.56.101"10PORT = 505011ADDR = (SERVER, PORT)12FORMAT = 'utf-8'13DISCONNECT_MESSAGE = "!DISCONNECT"1415server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)16server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)17server.bind(ADDR)181920def formatNumber(num):21 if num % 1 == 0:22 return int(num)23 else:24 return num25 26def sendResult(conn,addr,result):27 result = formatNumber(result)28 result = str(result)29 conn.send(result.encode(FORMAT))30 print(f"[RESULT] THE RESULT HAS BEEN SENT TO THE CLIENT {addr}")3132def sendQuadratic(conn,addr,result1,result2):33 result1 = str(result1)34 result2 = str(result2)35 quadraticList = [result1, result2]36 quadraticList = json.dumps(quadraticList)37 conn.sendall(bytes(quadraticList,encoding="utf-8"))38 print(f"[RESULT] THE RESULT HAS BEEN SENT TO THE CLIENT {addr}")394041def addition(conn,addr,receive_msg):42 result = float(receive_msg[1]) + float(receive_msg[2])43 sendResult(conn,addr,result)4445def substraction(conn,addr,receive_msg):46 result = float(receive_msg[1]) - float(receive_msg[2])47 sendResult(conn,addr,result)4849def log(conn,addr,receive_msg):50 result = math.log(float(receive_msg[1]), float(receive_msg[2]))51 sendResult(conn,addr,result)5253def squareRoot(conn,addr,receive_msg):54 result = math.sqrt(float(receive_msg[1]))55 sendResult(conn,addr,result)5657def exponent(conn,addr,receive_msg):58 result = pow(float(receive_msg[1]), float(receive_msg[2]))59 sendResult(conn,addr,result)6061def quadratic(conn,addr,receive_msg):62 d = (float(receive_msg[2])**2) - (4*float(receive_msg[1])*float(receive_msg[3]))63 result1 = (-float(receive_msg[2])-cmath.sqrt(d))/(2*float(receive_msg[1]))64 result2 = (-float(receive_msg[2])+cmath.sqrt(d))/(2*float(receive_msg[1]))65 sendQuadratic(conn,addr,result1,result2)6667def cuboidVol(conn,addr,receive_msg):68 result = float(receive_msg[1])*float(receive_msg[2])*float(receive_msg[3])69 sendResult(conn,addr,result)7071def sphereVol(conn,addr,receive_msg):72 pi=22/773 result = (4/3) * (pi * float(receive_msg[1]) ** 3)74 sendResult(conn,addr,result)7576def cylinderVol(conn,addr,receive_msg):77 pi=22/778 result = pi * float(receive_msg[1]) * float(receive_msg[1]) * float(receive_msg[2])79 sendResult(conn,addr,result)8081def coneVol(conn,addr,receive_msg):82 pi=22/783 result = (1.0/3) * pi * float(receive_msg[1]) * float(receive_msg[1]) * float(receive_msg[2])84 sendResult(conn,addr,result)858687def handle_client(conn,addr):88 try:89 try:90 print(f"[NEW CONNECTION] {addr} CONNECTED")91 connected = True92 while connected:93 receive_msg = conn.recv(2048).decode(FORMAT)94 if receive_msg:95 receive_msg = json.loads(receive_msg)96 if receive_msg == DISCONNECT_MESSAGE:97 connected = False98 print(f"[DISCONNECT] CLIENT {addr} HAS BEEN DISCONNECTED")99 100 if receive_msg[0] == "1":101 addition(conn,addr,receive_msg)102 elif receive_msg[0] == "2":103 substraction(conn,addr,receive_msg)104 elif receive_msg[0] == "3":105 log(conn,addr,receive_msg)106 elif receive_msg[0] == "4":107 squareRoot(conn,addr,receive_msg)108 elif receive_msg[0] == "5":109 exponent(conn,addr,receive_msg)110 elif receive_msg[0] == "6":111 quadratic(conn,addr,receive_msg)112 elif receive_msg[0] == "7":113 cuboidVol(conn,addr,receive_msg)114 elif receive_msg[0] == "8":115 sphereVol(conn,addr,receive_msg)116 elif receive_msg[0] == "9":117 cylinderVol(conn,addr,receive_msg)118 elif receive_msg[0] == "10":119 coneVol(conn,addr,receive_msg)120 conn.close()121 except OverflowError:122 print("Overflow Error")123 try:124 sys.exit(0)125 except SystemExit:126 os._exit(0) 127 except KeyboardInterrupt:128 print('Server Has Been Interrupted')129 try:130 sys.exit(0)131 except SystemExit:132 os._exit(0) 133134135def start():136 try:137 try:138 server.listen()139 try:140 while True:141 try:142 conn, addr = server.accept()143 process = mp.Process(target=handle_client, args=(conn,addr))144 process.start()145 except socket.error:146 print("Socket Error")147 except Exception as e:148 print("An Exception Has Occured:")149 print(e)150 sys.exit(1)151 except OverflowError:152 print("Overflow Error")153 try:154 sys.exit(0)155 except SystemExit:156 os._exit(0) 157 except KeyboardInterrupt:158 print('Server Has Been Interrupted')159 try:160 sys.exit(0)161 except SystemExit:162 os._exit(0)163164165if __name__ == "__main__":166 print("[STARTING] SERVER IS STARTING") ...

Full Screen

Full Screen

message.py

Source:message.py Github

copy

Full Screen

1# coding=utf-82from django.http import HttpResponse3from wechatpy.utils import check_signature4from wechatpy.exceptions import InvalidSignatureException5from weixin import settings, tasks6import wechatpy7from common.tools import auto_answer8from common.models import OpenUser9from common import settings as common_setting10from weixin.utils import tools11def notify(request):12 if request.method == 'GET':13 signature = request.GET.get('signature')14 timestamp = request.GET.get('timestamp')15 nonce = request.GET.get('nonce')16 echostr = request.GET.get('echostr')17 try:18 check_signature(settings.TOKEN, signature, timestamp, nonce)19 except InvalidSignatureException:20 return HttpResponse('')21 return HttpResponse(echostr)22 else:23 receive_msg = wechatpy.parse_message(request.body)24 # 收到文本,自动回复25 if isinstance(receive_msg, wechatpy.messages.TextMessage):26 content = auto_answer.answer_robot(receive_msg.content)27 reply = wechatpy.replies.TextReply(content=content, message=receive_msg)28 return HttpResponse(reply.render())29 # 关注事件30 elif isinstance(receive_msg, wechatpy.events.SubscribeEvent):31 receive_msg.msgtype = 'text'32 reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['SUBSCRIBE'], message=receive_msg)33 tasks.create_weixin_user.delay(receive_msg.source)34 return HttpResponse(reply.render())35 # 取消关注36 elif isinstance(receive_msg, wechatpy.events.UnsubscribeEvent):37 OpenUser.objects.filter(supplier=common_setting.SupplierEnum.WECHAT, openid=receive_msg.source).update(38 state=common_setting.StateEnum.DELETED)39 return HttpResponse('')40 # 扫描带参数二维码(已关注,未关注,未关注微信会提示关注) 可用于扫码登录41 elif isinstance(receive_msg, (wechatpy.events.ScanEvent, wechatpy.events.SubscribeScanEvent)):42 return HttpResponse('')43 elif isinstance(receive_msg, (wechatpy.events.LocationEvent, wechatpy.events.LocationSelectEvent)):44 print receive_msg45 receive_msg.msgtype = 'text'46 reply = wechatpy.replies.TextReply(content=u'位置上报成功', message=receive_msg)47 return HttpResponse(reply.render())48 elif isinstance(receive_msg, wechatpy.events.ClickEvent):49 event_handler = tools.EventHandler(receive_msg.key)50 visit_count = event_handler.get_event_handler()51 receive_msg.msgtype = 'text'52 reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['VISIT_COUNT'] % str(visit_count),53 message=receive_msg)54 return HttpResponse(reply.render())55 else:56 receive_msg.msgtype = 'text'57 reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['PARSE_FAIL'], message=receive_msg)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful