How to use is_request method in localstack

Best Python code snippet using localstack_python

_RosUnitySrvMessage.py

Source:_RosUnitySrvMessage.py Github

copy

Full Screen

1# This Python file uses the following encoding: utf-82"""autogenerated by genpy from ros_tcp_endpoint/RosUnitySrvMessage.msg. Do not edit."""3import codecs4import sys5python3 = True if sys.hexversion > 0x03000000 else False6import genpy7import struct8class RosUnitySrvMessage(genpy.Message):9 _md5sum = "5e4da90c1cd45db0881a77473482b38e"10 _type = "ros_tcp_endpoint/RosUnitySrvMessage"11 _has_header = False # flag to mark the presence of a Header object12 _full_text = """int32 srv_id13bool is_request14string topic15uint8[] payload"""16 __slots__ = ['srv_id','is_request','topic','payload']17 _slot_types = ['int32','bool','string','uint8[]']18 def __init__(self, *args, **kwds):19 """20 Constructor. Any message fields that are implicitly/explicitly21 set to None will be assigned a default value. The recommend22 use is keyword arguments as this is more robust to future message23 changes. You cannot mix in-order arguments and keyword arguments.24 The available fields are:25 srv_id,is_request,topic,payload26 :param args: complete set of field values, in .msg order27 :param kwds: use keyword arguments corresponding to message field names28 to set specific fields.29 """30 if args or kwds:31 super(RosUnitySrvMessage, self).__init__(*args, **kwds)32 # message fields cannot be None, assign default values for those that are33 if self.srv_id is None:34 self.srv_id = 035 if self.is_request is None:36 self.is_request = False37 if self.topic is None:38 self.topic = ''39 if self.payload is None:40 self.payload = b''41 else:42 self.srv_id = 043 self.is_request = False44 self.topic = ''45 self.payload = b''46 def _get_types(self):47 """48 internal API method49 """50 return self._slot_types51 def serialize(self, buff):52 """53 serialize message into buffer54 :param buff: buffer, ``StringIO``55 """56 try:57 _x = self58 buff.write(_get_struct_iB().pack(_x.srv_id, _x.is_request))59 _x = self.topic60 length = len(_x)61 if python3 or type(_x) == unicode:62 _x = _x.encode('utf-8')63 length = len(_x)64 buff.write(struct.Struct('<I%ss'%length).pack(length, _x))65 _x = self.payload66 length = len(_x)67 # - if encoded as a list instead, serialize as bytes instead of string68 if type(_x) in [list, tuple]:69 buff.write(struct.Struct('<I%sB'%length).pack(length, *_x))70 else:71 buff.write(struct.Struct('<I%ss'%length).pack(length, _x))72 except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))73 except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))74 def deserialize(self, str):75 """76 unpack serialized message in str into this message instance77 :param str: byte array of serialized message, ``str``78 """79 if python3:80 codecs.lookup_error("rosmsg").msg_type = self._type81 try:82 end = 083 _x = self84 start = end85 end += 586 (_x.srv_id, _x.is_request,) = _get_struct_iB().unpack(str[start:end])87 self.is_request = bool(self.is_request)88 start = end89 end += 490 (length,) = _struct_I.unpack(str[start:end])91 start = end92 end += length93 if python3:94 self.topic = str[start:end].decode('utf-8', 'rosmsg')95 else:96 self.topic = str[start:end]97 start = end98 end += 499 (length,) = _struct_I.unpack(str[start:end])100 start = end101 end += length102 self.payload = str[start:end]103 return self104 except struct.error as e:105 raise genpy.DeserializationError(e) # most likely buffer underfill106 def serialize_numpy(self, buff, numpy):107 """108 serialize message with numpy array types into buffer109 :param buff: buffer, ``StringIO``110 :param numpy: numpy python module111 """112 try:113 _x = self114 buff.write(_get_struct_iB().pack(_x.srv_id, _x.is_request))115 _x = self.topic116 length = len(_x)117 if python3 or type(_x) == unicode:118 _x = _x.encode('utf-8')119 length = len(_x)120 buff.write(struct.Struct('<I%ss'%length).pack(length, _x))121 _x = self.payload122 length = len(_x)123 # - if encoded as a list instead, serialize as bytes instead of string124 if type(_x) in [list, tuple]:125 buff.write(struct.Struct('<I%sB'%length).pack(length, *_x))126 else:127 buff.write(struct.Struct('<I%ss'%length).pack(length, _x))128 except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))129 except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))130 def deserialize_numpy(self, str, numpy):131 """132 unpack serialized message in str into this message instance using numpy for array types133 :param str: byte array of serialized message, ``str``134 :param numpy: numpy python module135 """136 if python3:137 codecs.lookup_error("rosmsg").msg_type = self._type138 try:139 end = 0140 _x = self141 start = end142 end += 5143 (_x.srv_id, _x.is_request,) = _get_struct_iB().unpack(str[start:end])144 self.is_request = bool(self.is_request)145 start = end146 end += 4147 (length,) = _struct_I.unpack(str[start:end])148 start = end149 end += length150 if python3:151 self.topic = str[start:end].decode('utf-8', 'rosmsg')152 else:153 self.topic = str[start:end]154 start = end155 end += 4156 (length,) = _struct_I.unpack(str[start:end])157 start = end158 end += length159 self.payload = str[start:end]160 return self161 except struct.error as e:162 raise genpy.DeserializationError(e) # most likely buffer underfill163_struct_I = genpy.struct_I164def _get_struct_I():165 global _struct_I166 return _struct_I167_struct_iB = None168def _get_struct_iB():169 global _struct_iB170 if _struct_iB is None:171 _struct_iB = struct.Struct("<iB")...

Full Screen

Full Screen

inspector.py

Source:inspector.py Github

copy

Full Screen

1# A simple inspect script designed to work with packets captured by HttpCanary (https://play.google.com/store/apps/details?id=com.guoshi.httpcanary).2# Tested with version 3.3.1. Previous versions might not work (we don't support hcy format).3#4# To use this script:5# 1. In HttpCanary, set target app to bilibili and then start capturing.6# 2. Do whatever IM-related operations in the bilibili app, then stop capturing.7# 3. Back to HttpCanary, tap menu (3-dots icon on the top-right), "Filter", under "Protocols" tap "TCP".8# 4. Tap any item with the port number 6537.9# 5. Tap menu, "Save", "Save Both", and enter a desired name.10# 6. `cd` into saved folder (by default it would be located at "/sdcard/HttpCanary/download/<NAME_YOU_ENTERED_IN_STEP_5>/"; you might want to `adb pull` to your PC).11# 7. Execute `python3 /path/to/inspector.py`. The script will try to find the packet with skey first, then it would began parsing & decrypting.12# 8. If you have multiple sessions just start from step 4 again.13import os14import traceback15import warnings16from google.protobuf import text_format17from blink.__proto__ import *18from blink.IMEncrypt import IMEncrypt19def get_message_by_cmdid(cmdid, is_request=False):20 if cmdid == CmdId.EN_CMD_ID_SESSION_SVR_MY_GROUP_UNREAD:21 if is_request:22 raise NotImplementedError("Unsupported CmdId", cmdid)23 else:24 return RspMyGroupUnread()25 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_ACK_ASSIS_MSG:26 if is_request:27 return ReqAckAssisMsg()28 else:29 return DummyRsp()30 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_BATCH_RM_SESSIONS:31 if is_request:32 return ReqBatRmSess()33 else:34 return DummyRsp()35 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_GET_SESSIONS:36 if is_request:37 return ReqGetSessions()38 else:39 return RspSessions()40 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_GROUP_ASSIS_MSG:41 if is_request:42 return ReqGroupAssisMsg()43 else:44 return RspSessionMsg()45 elif cmdid == CmdId.EN_CMD_ID_SHAKE_HAND:46 if is_request:47 return ReqHands()48 else:49 return RspHands()50 elif cmdid == CmdId.EN_CMD_ID_HEARTBEAT:51 if is_request:52 return ReqHeartbeat()53 else:54 return RspHeartbeat()55 elif cmdid == CmdId.EN_CMD_ID_LOGIN:56 if is_request:57 return ReqLogin()58 else:59 return RspLogin()60 elif cmdid == CmdId.EN_CMD_ID_LOGOUT:61 if is_request:62 return ReqLogout()63 else:64 return RspLogin()65 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_NEW_SESSIONS:66 if is_request:67 return ReqNewSessions()68 else:69 return RspSessions()70 elif cmdid == CmdId.EN_CMD_ID_SYNC_RELATION:71 if is_request:72 return ReqRelationSync()73 return RspRelationSync()74 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_REMOVE_SESSION:75 if is_request:76 return ReqRemoveSession()77 else:78 return DummyRsp()79 elif cmdid == CmdId.EN_CMD_ID_SEND_MSG:80 if is_request:81 return ReqSendMsg()82 else:83 return RspSendMsg()84 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SESSION_DETAIL:85 if is_request:86 return ReqSessionDetail()87 else:88 return SessionInfo()89 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_BATCH_SESS_DETAIL:90 if is_request:91 return ReqSessionDetails()92 else:93 return RspSessions()94 elif cmdid == CmdId.EN_CMD_ID_SYNC_FETCH_SESSION_MSGS:95 if is_request:96 return ReqSessionMsg()97 else:98 return RspSessionMsg()99 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SET_TOP:100 if is_request:101 return ReqSetTop()102 else:103 return DummyRsp()104 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SINGLE_UNREAD:105 if is_request:106 return ReqSingleUnread()107 else:108 return RspSingleUnread()109 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_UPDATE_ACK:110 if is_request:111 return ReqUpdateAck()112 else:113 return DummyRsp()114 elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_UPDATE_UNFLW_READ:115 if is_request:116 raise NotImplementedError("Unsupported CmdId", cmdid)117 else:118 return DummyRsp()119 elif cmdid == CmdId.EN_CMD_ID_SYNC_MSG:120 if is_request:121 return ReqMsgSync()122 else:123 return RspMsgSync()124 else:125 raise NotImplementedError("Unsupported CmdId", cmdid)126if __name__ == "__main__":127 warnings.filterwarnings("error")128 files = os.listdir()129 files.remove("tcp.hcy")130 files.remove("tcp.json")131 files = [i.replace(".bin", "") for i in files]132 files.sort(key=int)133 skey = None134 for i in files:135 with open(i + ".bin", "rb") as f:136 body = MsgBody()137 body.ParseFromString(f.read())138 if body.cmd == CmdId.EN_CMD_ID_SHAKE_HAND and body.err_msg == "ok":139 payload = RspHands()140 payload.ParseFromString(body.payload)141 skey = payload.skey142 break143 if not skey:144 print("No skey was found")145 exit(-1)146 for i in files:147 try:148 print("========== Packet #%s ==========" % (i))149 with open(i + ".bin", "rb") as f:150 body = MsgBody()151 body.ParseFromString(f.read())152 print(text_format.MessageToString(body, as_utf8=True))153 if body.cmd != CmdId.EN_CMD_ID_SHAKE_HAND:154 body.payload = IMEncrypt.decode(skey, body.payload)155 payload = get_message_by_cmdid(body.cmd, body.err_msg != "ok")156 payload.ParseFromString(body.payload)157 print("Decrypted payload:")158 print(text_format.MessageToString(payload, as_utf8=True))159 except:160 traceback.print_exc()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful