Best Python code snippet using localstack_python
_RosUnitySrvMessage.py
Source:_RosUnitySrvMessage.py  
1# This Python file uses the following encoding: utf-82"""autogenerated by genpy from ros_tcp_endpoint/RosUnitySrvMessage.msg. Do not edit."""3import codecs4import sys5python3 = True if sys.hexversion > 0x03000000 else False6import genpy7import struct8class RosUnitySrvMessage(genpy.Message):9  _md5sum = "5e4da90c1cd45db0881a77473482b38e"10  _type = "ros_tcp_endpoint/RosUnitySrvMessage"11  _has_header = False  # flag to mark the presence of a Header object12  _full_text = """int32 srv_id13bool is_request14string topic15uint8[] payload"""16  __slots__ = ['srv_id','is_request','topic','payload']17  _slot_types = ['int32','bool','string','uint8[]']18  def __init__(self, *args, **kwds):19    """20    Constructor. Any message fields that are implicitly/explicitly21    set to None will be assigned a default value. The recommend22    use is keyword arguments as this is more robust to future message23    changes.  You cannot mix in-order arguments and keyword arguments.24    The available fields are:25       srv_id,is_request,topic,payload26    :param args: complete set of field values, in .msg order27    :param kwds: use keyword arguments corresponding to message field names28    to set specific fields.29    """30    if args or kwds:31      super(RosUnitySrvMessage, self).__init__(*args, **kwds)32      # message fields cannot be None, assign default values for those that are33      if self.srv_id is None:34        self.srv_id = 035      if self.is_request is None:36        self.is_request = False37      if self.topic is None:38        self.topic = ''39      if self.payload is None:40        self.payload = b''41    else:42      self.srv_id = 043      self.is_request = False44      self.topic = ''45      self.payload = b''46  def _get_types(self):47    """48    internal API method49    """50    return self._slot_types51  def serialize(self, buff):52    """53    serialize message into buffer54    :param buff: buffer, ``StringIO``55    """56    try:57      _x = self58      buff.write(_get_struct_iB().pack(_x.srv_id, _x.is_request))59      _x = self.topic60      length = len(_x)61      if python3 or type(_x) == unicode:62        _x = _x.encode('utf-8')63        length = len(_x)64      buff.write(struct.Struct('<I%ss'%length).pack(length, _x))65      _x = self.payload66      length = len(_x)67      # - if encoded as a list instead, serialize as bytes instead of string68      if type(_x) in [list, tuple]:69        buff.write(struct.Struct('<I%sB'%length).pack(length, *_x))70      else:71        buff.write(struct.Struct('<I%ss'%length).pack(length, _x))72    except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))73    except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))74  def deserialize(self, str):75    """76    unpack serialized message in str into this message instance77    :param str: byte array of serialized message, ``str``78    """79    if python3:80      codecs.lookup_error("rosmsg").msg_type = self._type81    try:82      end = 083      _x = self84      start = end85      end += 586      (_x.srv_id, _x.is_request,) = _get_struct_iB().unpack(str[start:end])87      self.is_request = bool(self.is_request)88      start = end89      end += 490      (length,) = _struct_I.unpack(str[start:end])91      start = end92      end += length93      if python3:94        self.topic = str[start:end].decode('utf-8', 'rosmsg')95      else:96        self.topic = str[start:end]97      start = end98      end += 499      (length,) = _struct_I.unpack(str[start:end])100      start = end101      end += length102      self.payload = str[start:end]103      return self104    except struct.error as e:105      raise genpy.DeserializationError(e)  # most likely buffer underfill106  def serialize_numpy(self, buff, numpy):107    """108    serialize message with numpy array types into buffer109    :param buff: buffer, ``StringIO``110    :param numpy: numpy python module111    """112    try:113      _x = self114      buff.write(_get_struct_iB().pack(_x.srv_id, _x.is_request))115      _x = self.topic116      length = len(_x)117      if python3 or type(_x) == unicode:118        _x = _x.encode('utf-8')119        length = len(_x)120      buff.write(struct.Struct('<I%ss'%length).pack(length, _x))121      _x = self.payload122      length = len(_x)123      # - if encoded as a list instead, serialize as bytes instead of string124      if type(_x) in [list, tuple]:125        buff.write(struct.Struct('<I%sB'%length).pack(length, *_x))126      else:127        buff.write(struct.Struct('<I%ss'%length).pack(length, _x))128    except struct.error as se: self._check_types(struct.error("%s: '%s' when writing '%s'" % (type(se), str(se), str(locals().get('_x', self)))))129    except TypeError as te: self._check_types(ValueError("%s: '%s' when writing '%s'" % (type(te), str(te), str(locals().get('_x', self)))))130  def deserialize_numpy(self, str, numpy):131    """132    unpack serialized message in str into this message instance using numpy for array types133    :param str: byte array of serialized message, ``str``134    :param numpy: numpy python module135    """136    if python3:137      codecs.lookup_error("rosmsg").msg_type = self._type138    try:139      end = 0140      _x = self141      start = end142      end += 5143      (_x.srv_id, _x.is_request,) = _get_struct_iB().unpack(str[start:end])144      self.is_request = bool(self.is_request)145      start = end146      end += 4147      (length,) = _struct_I.unpack(str[start:end])148      start = end149      end += length150      if python3:151        self.topic = str[start:end].decode('utf-8', 'rosmsg')152      else:153        self.topic = str[start:end]154      start = end155      end += 4156      (length,) = _struct_I.unpack(str[start:end])157      start = end158      end += length159      self.payload = str[start:end]160      return self161    except struct.error as e:162      raise genpy.DeserializationError(e)  # most likely buffer underfill163_struct_I = genpy.struct_I164def _get_struct_I():165    global _struct_I166    return _struct_I167_struct_iB = None168def _get_struct_iB():169    global _struct_iB170    if _struct_iB is None:171        _struct_iB = struct.Struct("<iB")...inspector.py
Source:inspector.py  
1# A simple inspect script designed to work with packets captured by HttpCanary (https://play.google.com/store/apps/details?id=com.guoshi.httpcanary).2# Tested with version 3.3.1. Previous versions might not work (we don't support hcy format).3#4# To use this script:5# 1. In HttpCanary, set target app to bilibili and then start capturing.6# 2. Do whatever IM-related operations in the bilibili app, then stop capturing.7# 3. Back to HttpCanary, tap menu (3-dots icon on the top-right), "Filter", under "Protocols" tap "TCP".8# 4. Tap any item with the port number 6537.9# 5. Tap menu, "Save", "Save Both", and enter a desired name.10# 6. `cd` into saved folder (by default it would be located at "/sdcard/HttpCanary/download/<NAME_YOU_ENTERED_IN_STEP_5>/"; you might want to `adb pull` to your PC).11# 7. Execute `python3 /path/to/inspector.py`. The script will try to find the packet with skey first, then it would began parsing & decrypting.12# 8. If you have multiple sessions just start from step 4 again.13import os14import traceback15import warnings16from google.protobuf import text_format17from blink.__proto__ import *18from blink.IMEncrypt import IMEncrypt19def get_message_by_cmdid(cmdid, is_request=False):20    if cmdid == CmdId.EN_CMD_ID_SESSION_SVR_MY_GROUP_UNREAD:21        if is_request:22            raise NotImplementedError("Unsupported CmdId", cmdid)23        else:24            return RspMyGroupUnread()25    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_ACK_ASSIS_MSG:26        if is_request:27            return ReqAckAssisMsg()28        else:29            return DummyRsp()30    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_BATCH_RM_SESSIONS:31        if is_request:32            return ReqBatRmSess()33        else:34            return DummyRsp()35    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_GET_SESSIONS:36        if is_request:37            return ReqGetSessions()38        else:39            return RspSessions()40    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_GROUP_ASSIS_MSG:41        if is_request:42            return ReqGroupAssisMsg()43        else:44            return RspSessionMsg()45    elif cmdid == CmdId.EN_CMD_ID_SHAKE_HAND:46        if is_request:47            return ReqHands()48        else:49            return RspHands()50    elif cmdid == CmdId.EN_CMD_ID_HEARTBEAT:51        if is_request:52            return ReqHeartbeat()53        else:54            return RspHeartbeat()55    elif cmdid == CmdId.EN_CMD_ID_LOGIN:56        if is_request:57            return ReqLogin()58        else:59            return RspLogin()60    elif cmdid == CmdId.EN_CMD_ID_LOGOUT:61        if is_request:62            return ReqLogout()63        else:64            return RspLogin()65    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_NEW_SESSIONS:66        if is_request:67            return ReqNewSessions()68        else:69            return RspSessions()70    elif cmdid == CmdId.EN_CMD_ID_SYNC_RELATION:71        if is_request:72            return ReqRelationSync()73        return RspRelationSync()74    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_REMOVE_SESSION:75        if is_request:76            return ReqRemoveSession()77        else:78            return DummyRsp()79    elif cmdid == CmdId.EN_CMD_ID_SEND_MSG:80        if is_request:81            return ReqSendMsg()82        else:83            return RspSendMsg()84    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SESSION_DETAIL:85        if is_request:86            return ReqSessionDetail()87        else:88            return SessionInfo()89    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_BATCH_SESS_DETAIL:90        if is_request:91            return ReqSessionDetails()92        else:93            return RspSessions()94    elif cmdid == CmdId.EN_CMD_ID_SYNC_FETCH_SESSION_MSGS:95        if is_request:96            return ReqSessionMsg()97        else:98            return RspSessionMsg()99    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SET_TOP:100        if is_request:101            return ReqSetTop()102        else:103            return DummyRsp()104    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_SINGLE_UNREAD:105        if is_request:106            return ReqSingleUnread()107        else:108            return RspSingleUnread()109    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_UPDATE_ACK:110        if is_request:111            return ReqUpdateAck()112        else:113            return DummyRsp()114    elif cmdid == CmdId.EN_CMD_ID_SESSION_SVR_UPDATE_UNFLW_READ:115        if is_request:116            raise NotImplementedError("Unsupported CmdId", cmdid)117        else:118            return DummyRsp()119    elif cmdid == CmdId.EN_CMD_ID_SYNC_MSG:120        if is_request:121            return ReqMsgSync()122        else:123            return RspMsgSync()124    else:125        raise NotImplementedError("Unsupported CmdId", cmdid)126if __name__ == "__main__":127    warnings.filterwarnings("error")128    files = os.listdir()129    files.remove("tcp.hcy")130    files.remove("tcp.json")131    files = [i.replace(".bin", "") for i in files]132    files.sort(key=int)133    skey = None134    for i in files:135        with open(i + ".bin", "rb") as f:136            body = MsgBody()137            body.ParseFromString(f.read())138        if body.cmd == CmdId.EN_CMD_ID_SHAKE_HAND and body.err_msg == "ok":139            payload = RspHands()140            payload.ParseFromString(body.payload)141            skey = payload.skey142            break143    if not skey:144        print("No skey was found")145        exit(-1)146    for i in files:147        try:148            print("========== Packet #%s ==========" % (i))149            with open(i + ".bin", "rb") as f:150                body = MsgBody()151                body.ParseFromString(f.read())152            print(text_format.MessageToString(body, as_utf8=True))153            if body.cmd != CmdId.EN_CMD_ID_SHAKE_HAND:154                body.payload = IMEncrypt.decode(skey, body.payload)155                payload = get_message_by_cmdid(body.cmd, body.err_msg != "ok")156                payload.ParseFromString(body.payload)157                print("Decrypted payload:")158                print(text_format.MessageToString(payload, as_utf8=True))159        except:160            traceback.print_exc()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
