Best Python code snippet using localstack_python
abb_client.py
Source:abb_client.py  
1# -*- coding: utf-8 -*-2"""Python client interface for ABB RAPID server.3Adapted from Open ABB python client (https://github.com/robotics/open_abb)4"""5import time6import socket7from struct import pack, unpack_from, calcsize8import numpy as np9class ABBClient:10    """Python client interface for ABB RAPID server.11    """12    class CommandFailed(RuntimeError):13        pass14    class InvalidZone(ValueError):15        pass16    17    SERVER_ERROR = 018    SERVER_OK = 119    20    def __init__(self, ip='192.168.125.1', port=5000):21        self._delay   = .0822        self.set_units('millimeters', 'degrees')23        self.connect((ip, port))24    def __repr__(self):      25        return "{} ({})".format(self.__class__.__name__, self.get_info())26        27    def __str__(self):28        return self.__repr__()29        30    def __enter__(self):31        return self32        33    def __exit__(self, exc_type, exc_value, traceback):34        self.close()35    def set_units(self, linear, angular):36        """Sets linear and angular units.37        """38        units_l = {'millimeters' : 1.0,39                   'meters' : 1000.0,40                   'inches' : 25.4,41                   }42        units_a = {'degrees' : 1.0,43                   'radians' : 57.2957795,44                   }45        self._scale_linear = units_l[linear]46        self._scale_angle  = units_a[angular]47        48    def connect(self, remote):49        """Connects to server.50        """51        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)52        self.sock.settimeout(2.5)53        self.sock.connect(remote)54        self.sock.settimeout(None)        55        print("Client connected ...")56    def get_info(self):57        """Returns a unique robot identifier string.58        """59        command = 060        send_msg = pack('>H', command)61        self.sock.send(send_msg)62        time.sleep(self._delay)63        receive_msg = self.sock.recv(4096)64        retvals = unpack_from('>H', receive_msg)65        ack = retvals[0]66        if ack != ABBClient.SERVER_OK:67            raise ABBClient.CommandFailed            68        info = receive_msg[calcsize('>H'):].decode()69        return info 70    def move_joints(self, joint_angles):71        """Executes an immediate move to the specified joint angles.72        73        joint_angles = (j0, j1, j2, j3, j4, j5)74        j0, j1, j2, j3, j4, j5 are numbered from base to end effector and are75        measured in degrees (default)76        """77        joint_angles = np.array(joint_angles, dtype=np.float32).ravel()78        joint_angles *= self._scale_angle79        command = 180        send_msg = pack('>Hffffff', command, *joint_angles)81        self.sock.send(send_msg)82        time.sleep(self._delay)83        receive_msg = self.sock.recv(4096)84        retvals = unpack_from('>H', receive_msg)85        ack = retvals[0]86        if ack != ABBClient.SERVER_OK:87            raise ABBClient.CommandFailed  88    def move_linear(self, pose):89        """Executes a linear/cartesian move from the current base frame pose to90        the specified pose.91        92        pose = (x, y, z, qw, qx, qy, qz)93        x, y, z specify a Cartesian position (default mm)94        qw, qx, qy, qz specify a quaternion rotation95        """96        pose = np.array(pose, dtype=np.float32).ravel()97        pose[:3] *= self._scale_linear98        99        command = 2100        send_msg = pack('>Hfffffff', command, *pose)101        self.sock.send(send_msg)102        time.sleep(self._delay)103        receive_msg = self.sock.recv(4096)104        retvals = unpack_from('>H', receive_msg)105        ack = retvals[0]106        if ack != ABBClient.SERVER_OK:107            raise ABBClient.CommandFailed       108    def move_circular(self, via_pose, end_pose):109        """Executes a movement in a circular path from the current base frame110        pose, through via_pose, to end_pose.111        112        via_pose, end_pose = (x, y, z, qw, qx, qy, qz)113        x, y, z specify a Cartesian position (default mm)114        qw, qx, qy, qz specify a quaternion rotation115        """        116        via_pose = np.array(via_pose, dtype=np.float32).ravel()117        via_pose[:3] *= self._scale_linear118        end_pose = np.array(end_pose, dtype=np.float32).ravel()119        end_pose[:3] *= self._scale_linear        120        121        command = 3122        send_msg = pack('>Hffffffffffffff', command, *via_pose, *end_pose)123        self.sock.send(send_msg)124        time.sleep(self._delay)125        receive_msg = self.sock.recv(4096)126        retvals = unpack_from('>H', receive_msg)127        ack = retvals[0]128        if ack != ABBClient.SERVER_OK:129            raise ABBClient.CommandFailed  130    def set_tcp(self, tcp):131        """Sets the tool center point (TCP) of the robot.132        133        The TCP is specified in the output flange frame, which is located at134        the intersection of the tool flange center axis and the flange face,135        with the z-axis aligned with the tool flange center axis.136        137        tcp = (x, y, z, qw, qx, qy, qz)138        x, y, z specify a Cartesian position (default mm)139        qw, qx, qy, qz specify a quaternion rotation140        """141        tcp = np.array(tcp, dtype=np.float32).ravel()142        tcp[:3] *= self._scale_linear143        144        command = 4145        send_msg = pack('>Hfffffff', command, *tcp)146        self.sock.send(send_msg)147        time.sleep(self._delay)148        receive_msg = self.sock.recv(4096)149        retvals = unpack_from('>H', receive_msg)150        ack = retvals[0]151        if ack != ABBClient.SERVER_OK:152            raise ABBClient.CommandFailed  153    def set_work_object(self, work_object):154        """Sets the work object on the robot.155        156        The work object is a local coordinate frame on the robot, where157        subsequent linear moves will be in this coordinate frame. 158        159        work_object = (x, y, z, qw, qx, qy, qz)160        x, y, z specify a Cartesian position (default mm)161        qw, qx, qy, qz specify a quaternion rotation162        """163        work_object = np.array(work_object, dtype=np.float32).ravel()164        work_object[:3] *= self._scale_linear165        166        command = 5167        send_msg = pack('>Hfffffff', command, *work_object)168        self.sock.send(send_msg)169        time.sleep(self._delay)170        receive_msg = self.sock.recv(4096)171        retvals = unpack_from('>H', receive_msg)172        ack = retvals[0]173        if ack != ABBClient.SERVER_OK:174            raise ABBClient.CommandFailed       175    def set_speed(self, linear_speed, angular_speed):176        """Sets the linear speed (default mm/s) and angular speed177        (default deg/s) of the robot TCP.178        """179        linear_speed *= self._scale_linear180        angular_speed *= self._scale_angle181        command = 6182        send_msg = pack('>Hff', command, linear_speed, angular_speed)183        self.sock.send(send_msg)184        time.sleep(self._delay)185        receive_msg = self.sock.recv(4096)186        retvals = unpack_from('>H', receive_msg)187        ack = retvals[0]188        if ack != ABBClient.SERVER_OK:189            raise ABBClient.CommandFailed     190    def set_zone(self, 191                 zone_key = 'z0', 192                 point_motion = False, 193                 manual_zone = None):194        zone_dict = {'z0':  (0.3, 0.3, 0.03), 195                     'z1':  (1, 1, 0.1), 196                     'z5':  (5, 8, 0.8), 197                     'z10': (10, 15, 1.5), 198                     'z15': (15, 23, 2.3), 199                     'z20': (20, 30, 3), 200                     'z30': (30, 45, 4.5), 201                     'z50': (50, 75, 7.5), 202                     'z100': (100, 150, 15), 203                     'z200': (200, 300, 30),204                    }205        """Sets the motion zone of the robot. This can also be thought of as206        the flyby zone, AKA if the robot is going from point A -> B -> C,207        how close do we have to pass by B to get to C208        209        zone_key: uses values from RAPID handbook (stored here in zone_dict)210        with keys 'z*', you should probably use these211        point_motion: go to point exactly, and stop briefly before moving on212        manual_zone = [pzone_tcp, pzone_ori, zone_ori]213        pzone_tcp: default mm, radius from goal where robot tool centerpoint 214                   is not rigidly constrained215        pzone_ori: default mm, radius from goal where robot tool orientation 216                   is not rigidly constrained217        zone_ori: default degrees, zone size for the tool reorientation218        """219        if point_motion: 220            zone = np.array((0, 0, 0))221        elif manual_zone is not None and len(manual_zone) == 3:222            zone = np.array(manual_zone, dtype=np.float32).ravel()223        elif zone_key in zone_dict.keys(): 224            zone = np.array(zone_dict[zone_key])225        else:226            raise ABBClient.InvalidZone227 228        zone[0] *= self._scale_linear229        zone[1] *= self._scale_linear230        zone[2] *= self._scale_angle231        command = 7232        send_msg = pack('>HHfff', command, int(point_motion), *zone)233        self.sock.send(send_msg)234        time.sleep(self._delay)235        receive_msg = self.sock.recv(4096)236        retvals = unpack_from('>H', receive_msg)237        ack = retvals[0]238        if ack != ABBClient.SERVER_OK:239            raise ABBClient.CommandFailed  240    def get_joint_angles(self):241        """retvalsurns the robot joint angles.242        243        joint_angles = (j0, j1, j2, j3, j4, j5)244        j0, j1, j2, j3, j4, j5 are numbered from base to end effector and are245        measured in degrees (default)246        """       247        command = 8248        send_msg = pack('>H', command)249        self.sock.send(send_msg)250        time.sleep(self._delay)251        receive_msg = self.sock.recv(4096)252        retvals = unpack_from('>Hffffff', receive_msg)253        ack = retvals[0]254        if ack != ABBClient.SERVER_OK:255            raise ABBClient.CommandFailed  256        joint_angles = np.array(retvals[1:], dtype=np.float64)257        joint_angles /= self._scale_angle258        return joint_angles259    260    def get_pose(self):261        """retvalsurns the TCP pose in the reference coordinate frame.262        263        pose = (x, y, z, qw, qx, qy, qz)264        x, y, z specify a Cartesian position (default mm)265        qw, qx, qy, qz specify a quaternion rotation266        """267        command = 9268        send_msg = pack('>H', command)269        self.sock.send(send_msg)270        time.sleep(self._delay)271        receive_msg = self.sock.recv(4096)272        retvals = unpack_from('>Hfffffff', receive_msg)273        ack = retvals[0]274        if ack != ABBClient.SERVER_OK:275            raise ABBClient.CommandFailed  276        pose = np.array(retvals[1:], dtype=np.float64)277        pose[:3] /= self._scale_linear      278        return pose279        280    def close(self):281        """Releases any resources held by the controller (e.g., sockets).282        """283        command = 99284        send_msg = pack('>H', command)285        self.sock.send(send_msg)286        time.sleep(self._delay)        287        self.sock.shutdown(socket.SHUT_RDWR)288        self.sock.close()...calculatorServer.py
Source:calculatorServer.py  
1import multiprocessing as mp2import socket3import json4import sys5import os6import math7import cmath89SERVER = "192.168.56.101"10PORT = 505011ADDR = (SERVER, PORT)12FORMAT = 'utf-8'13DISCONNECT_MESSAGE = "!DISCONNECT"1415server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)16server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)17server.bind(ADDR)181920def formatNumber(num):21  if num % 1 == 0:22    return int(num)23  else:24    return num25    26def sendResult(conn,addr,result):27    result = formatNumber(result)28    result = str(result)29    conn.send(result.encode(FORMAT))30    print(f"[RESULT] THE RESULT HAS BEEN SENT TO THE CLIENT {addr}")3132def sendQuadratic(conn,addr,result1,result2):33    result1 = str(result1)34    result2 = str(result2)35    quadraticList = [result1, result2]36    quadraticList = json.dumps(quadraticList)37    conn.sendall(bytes(quadraticList,encoding="utf-8"))38    print(f"[RESULT] THE RESULT HAS BEEN SENT TO THE CLIENT {addr}")394041def addition(conn,addr,receive_msg):42    result = float(receive_msg[1]) + float(receive_msg[2])43    sendResult(conn,addr,result)4445def substraction(conn,addr,receive_msg):46    result = float(receive_msg[1]) - float(receive_msg[2])47    sendResult(conn,addr,result)4849def log(conn,addr,receive_msg):50    result = math.log(float(receive_msg[1]), float(receive_msg[2]))51    sendResult(conn,addr,result)5253def squareRoot(conn,addr,receive_msg):54    result = math.sqrt(float(receive_msg[1]))55    sendResult(conn,addr,result)5657def exponent(conn,addr,receive_msg):58    result = pow(float(receive_msg[1]), float(receive_msg[2]))59    sendResult(conn,addr,result)6061def quadratic(conn,addr,receive_msg):62    d = (float(receive_msg[2])**2) - (4*float(receive_msg[1])*float(receive_msg[3]))63    result1 = (-float(receive_msg[2])-cmath.sqrt(d))/(2*float(receive_msg[1]))64    result2 = (-float(receive_msg[2])+cmath.sqrt(d))/(2*float(receive_msg[1]))65    sendQuadratic(conn,addr,result1,result2)6667def cuboidVol(conn,addr,receive_msg):68    result = float(receive_msg[1])*float(receive_msg[2])*float(receive_msg[3])69    sendResult(conn,addr,result)7071def sphereVol(conn,addr,receive_msg):72    pi=22/773    result = (4/3) * (pi * float(receive_msg[1]) ** 3)74    sendResult(conn,addr,result)7576def cylinderVol(conn,addr,receive_msg):77    pi=22/778    result =  pi * float(receive_msg[1]) * float(receive_msg[1]) * float(receive_msg[2])79    sendResult(conn,addr,result)8081def coneVol(conn,addr,receive_msg):82    pi=22/783    result =  (1.0/3) * pi * float(receive_msg[1]) * float(receive_msg[1]) * float(receive_msg[2])84    sendResult(conn,addr,result)858687def handle_client(conn,addr):88    try:89        try:90            print(f"[NEW CONNECTION] {addr} CONNECTED")91            connected = True92            while connected:93                receive_msg = conn.recv(2048).decode(FORMAT)94                if receive_msg:95                    receive_msg = json.loads(receive_msg)96                    if receive_msg == DISCONNECT_MESSAGE:97                        connected = False98                        print(f"[DISCONNECT] CLIENT {addr} HAS BEEN DISCONNECTED")99                        100                    if receive_msg[0] == "1":101                        addition(conn,addr,receive_msg)102                    elif receive_msg[0] == "2":103                        substraction(conn,addr,receive_msg)104                    elif receive_msg[0] == "3":105                        log(conn,addr,receive_msg)106                    elif receive_msg[0] == "4":107                        squareRoot(conn,addr,receive_msg)108                    elif receive_msg[0] == "5":109                        exponent(conn,addr,receive_msg)110                    elif receive_msg[0] == "6":111                        quadratic(conn,addr,receive_msg)112                    elif receive_msg[0] == "7":113                        cuboidVol(conn,addr,receive_msg)114                    elif receive_msg[0] == "8":115                        sphereVol(conn,addr,receive_msg)116                    elif receive_msg[0] == "9":117                        cylinderVol(conn,addr,receive_msg)118                    elif receive_msg[0] == "10":119                        coneVol(conn,addr,receive_msg)120            conn.close()121        except OverflowError:122            print("Overflow Error")123            try:124                sys.exit(0)125            except SystemExit:126                os._exit(0)    127    except KeyboardInterrupt:128        print('Server Has Been Interrupted')129        try:130            sys.exit(0)131        except SystemExit:132            os._exit(0)    133134135def start():136    try:137        try:138            server.listen()139            try:140                while True:141                    try:142                        conn, addr = server.accept()143                        process = mp.Process(target=handle_client, args=(conn,addr))144                        process.start()145                    except socket.error:146                        print("Socket Error")147            except Exception as e:148                print("An Exception Has Occured:")149                print(e)150                sys.exit(1)151        except OverflowError:152            print("Overflow Error")153            try:154                sys.exit(0)155            except SystemExit:156                os._exit(0)    157    except KeyboardInterrupt:158        print('Server Has Been Interrupted')159        try:160            sys.exit(0)161        except SystemExit:162            os._exit(0)163164165if __name__ == "__main__":166    print("[STARTING] SERVER IS STARTING")
...message.py
Source:message.py  
1# coding=utf-82from django.http import HttpResponse3from wechatpy.utils import check_signature4from wechatpy.exceptions import InvalidSignatureException5from weixin import settings, tasks6import wechatpy7from common.tools import auto_answer8from common.models import OpenUser9from common import settings as common_setting10from weixin.utils import tools11def notify(request):12    if request.method == 'GET':13        signature = request.GET.get('signature')14        timestamp = request.GET.get('timestamp')15        nonce = request.GET.get('nonce')16        echostr = request.GET.get('echostr')17        try:18            check_signature(settings.TOKEN, signature, timestamp, nonce)19        except InvalidSignatureException:20            return HttpResponse('')21        return HttpResponse(echostr)22    else:23        receive_msg = wechatpy.parse_message(request.body)24        # æ¶å°ææ¬,èªå¨åå¤25        if isinstance(receive_msg, wechatpy.messages.TextMessage):26            content = auto_answer.answer_robot(receive_msg.content)27            reply = wechatpy.replies.TextReply(content=content, message=receive_msg)28            return HttpResponse(reply.render())29        # å
³æ³¨äºä»¶30        elif isinstance(receive_msg, wechatpy.events.SubscribeEvent):31            receive_msg.msgtype = 'text'32            reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['SUBSCRIBE'], message=receive_msg)33            tasks.create_weixin_user.delay(receive_msg.source)34            return HttpResponse(reply.render())35        # åæ¶å
³æ³¨36        elif isinstance(receive_msg, wechatpy.events.UnsubscribeEvent):37            OpenUser.objects.filter(supplier=common_setting.SupplierEnum.WECHAT, openid=receive_msg.source).update(38                state=common_setting.StateEnum.DELETED)39            return HttpResponse('')40        # æ«æå¸¦åæ°äºç»´ç (å·²å
³æ³¨,æªå
³æ³¨,æªå
³æ³¨å¾®ä¿¡ä¼æç¤ºå
³æ³¨) å¯ç¨äºæ«ç ç»å½41        elif isinstance(receive_msg, (wechatpy.events.ScanEvent, wechatpy.events.SubscribeScanEvent)):42            return HttpResponse('')43        elif isinstance(receive_msg, (wechatpy.events.LocationEvent, wechatpy.events.LocationSelectEvent)):44            print receive_msg45            receive_msg.msgtype = 'text'46            reply = wechatpy.replies.TextReply(content=u'ä½ç½®ä¸æ¥æå', message=receive_msg)47            return HttpResponse(reply.render())48        elif isinstance(receive_msg, wechatpy.events.ClickEvent):49            event_handler = tools.EventHandler(receive_msg.key)50            visit_count = event_handler.get_event_handler()51            receive_msg.msgtype = 'text'52            reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['VISIT_COUNT'] % str(visit_count),53                                               message=receive_msg)54            return HttpResponse(reply.render())55        else:56            receive_msg.msgtype = 'text'57            reply = wechatpy.replies.TextReply(content=settings.ANSWER_MSG_LIST['PARSE_FAIL'], message=receive_msg)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
