Best Python code snippet using playwright-python
test_rospy_msg.py
Source:test_rospy_msg.py  
...55        try:56            args_kwds_to_message(Val, 'hi', val='hello world-3')57            self.fail("should not allow args and kwds")58        except TypeError: pass59    def test_serialize_message(self):60        import rospy.msg61        import rospy.rostime62        # have to fake-init rostime so that Header can be stamped63        rospy.rostime.set_rostime_initialized(True)64        65        buff = StringIO()66        seq = random.randint(1, 1000)67        68        #serialize_message(seq, msg)69        from test_rospy.msg import Val70        #serialize a simple 'Val' with a string in it71        teststr = 'foostr-%s'%time.time()72        val = Val(teststr)73        fmt = "<II%ss"%len(teststr)74        size = struct.calcsize(fmt) - 475        valid = struct.pack(fmt, size, len(teststr), teststr)76        77        rospy.msg.serialize_message(buff, seq, val)78        self.assertEquals(valid, buff.getvalue())79        80        #test repeated serialization81        rospy.msg.serialize_message(buff, seq, val)82        rospy.msg.serialize_message(buff, seq, val)        83        self.assertEquals(valid*3, buff.getvalue())84        # - once more just to make sure that the buffer position is85        # being preserved properly86        buff.seek(0)87        rospy.msg.serialize_message(buff, seq, val)88        self.assertEquals(valid*3, buff.getvalue())        89        rospy.msg.serialize_message(buff, seq, val)90        self.assertEquals(valid*3, buff.getvalue()) 91        rospy.msg.serialize_message(buff, seq, val)        92        self.assertEquals(valid*3, buff.getvalue())        93        #test sequence parameter94        buff.truncate(0)95        from test_rospy.msg import HeaderVal96        t = rospy.Time.now()97        t.secs = t.secs - 1 # move it back in time98        h = rospy.Header(None, rospy.Time.now(), teststr)99        h.stamp = t100        val = HeaderVal(h, teststr)101        seq += 1102        103        rospy.msg.serialize_message(buff, seq, val)104        self.assertEquals(val.header, h)105        self.assertEquals(seq, h.seq)106        #should not have been changed107        self.assertEquals(t, h.stamp) 108        self.assertEquals(teststr, h.frame_id) 109        110        #test frame_id setting111        h.frame_id = None112        rospy.msg.serialize_message(buff, seq, val)113        self.assertEquals(val.header, h)114        self.assertEquals('0', h.frame_id) 115        116    def test_deserialize_messages(self):117        import rospy.msg118        from test_rospy.msg import Val119        num_tests = 10120        teststrs = ['foostr-%s'%random.randint(0, 10000) for i in xrange(0, num_tests)]121        valids = []122        for t in teststrs:123            fmt = "<II%ss"%len(t)124            size = struct.calcsize(fmt) - 4125            valids.append(struct.pack(fmt, size, len(t), t))126        data_class = Val...test_messaging.py
Source:test_messaging.py  
...6    return ''.join([hex(b) for b in packed])7# These tests are based on Error handling with Reset8# https://boltprotocol.org/v1/#examples-reset9def test_pack_init_message():10    output = serialize_message(11        Message.INIT,12        params=("MyClient/1.0", collections.OrderedDict([("scheme", "basic"), ("principal", "neo4j"), ("credentials", "secret")])))13    expected = """0 40 B2 1  8C 4D 79 43  6C 69 65 6E  74 2F 31 2E14                  30 A3 86 73  63 68 65 6D  65 85 62 61  73 69 63 8915                  70 72 69 6E  63 69 70 61  6C 85 6E 65  6F 34 6A 8B16                  63 72 65 64  65 6E 74 69  61 6C 73 86  73 65 63 7217                  65 74 0 0""".replace(' ', '').replace('\n', '').lower()18    output = get_hexbytes(output.getvalue())19    assert output.replace('0x', '').lower() == expected20def test_pack_success_message():21    output = serialize_message(22        Message.SUCCESS,23        params=({"server": "Neo4j/3.1.0"},))24    expected = """0 16 B1 70  A1 86 73 65  72 76 65 72  8B 4E 65 6F25                  34 6A 2F 33  2E 31 2E 30  0 0""".replace(' ', '').replace('\n', '').lower()26    output = get_hexbytes(output.getvalue())27    assert output.replace('0x', '').lower() == expected28def test_pack_run_message():29    output = serialize_message(30        Message.RUN,31        params=("This will cause a syntax error", {}))32    expected = """0 23 b2 10  d0 1e 54 68  69 73 20 77  69 6c 6c 2033                  63 61 75 73  65 20 61 20  73 79 6e 74  61 78 20 6534                  72 72 6f 72  a0 0 0""".replace(' ', '').replace('\n', '').lower()35    output = get_hexbytes(output.getvalue())36    assert output.replace('0x', '').lower() == expected37def test_pack_good_run_message():38    output = serialize_message(39        Message.RUN,40        params=("RETURN 1 AS num", {}))41    expected = """0 13 b2 10  8f 52 45 54  55 52 4e 20  31 20 41 5342                  20 6e 75 6d  a0 0 0""".replace(' ', '').replace('\n', '').lower()43    output = get_hexbytes(output.getvalue())44    assert output.replace('0x', '').lower() == expected45@pytest.mark.skip(reason="Suspicious failure. Under investigation")46def test_pack_failure_message():47    output = serialize_message(48        Message.FAILURE,49        params=(collections.OrderedDict(50            [("code", "Neo.ClientError.Statement.SyntaxError"),51             ("message","""Invalid input 'T': expected <init> (line 1, column 1 (offset: 0))52                          "This will cause a syntax error"^""")])))53    expected = """0 9E B1 7F  A2 84 63 6F  64 65 D0 25  4E 65 6F 2E54                  43 6C 69 65  6E 74 45 72  72 6F 72 2E  53 74 61 7455                  65 6D 65 6E  74 2E 53 79  6E 74 61 78  45 72 72 6F56                  72 87 6D 65  73 73 61 67  65 D0 65 49  6E 76 61 6C57                  69 64 20 69  6E 70 75 74  20 27 54 27  3A 20 65 7858                  70 65 63 74  65 64 20 3C  69 6E 69 74  3E 20 28 6C59                  69 6E 65 20  31 2C 20 63  6F 6C 75 6D  6E 20 31 2060                  28 6F 66 66  73 65 74 3A  20 30 29 29  A 22 54 6861                  69 73 20 77  69 6C 6C 20  63 61 75 73  65 20 61 2062                  73 79 6E 74  61 78 20 65  72 72 6F 72  22 A 20 5E63                  0 0""".replace(' ', '').replace('\n', '').lower()64    output = get_hexbytes(output.getvalue())65    assert output.replace('0x', '').lower() == expected66def test_pack_pull_all():67    output = serialize_message(Message.PULL_ALL)68    expected = """0 2 b0 3f 0 0""".replace(' ', '').lower()69    output = get_hexbytes(output.getvalue())70    assert output.replace('0x', '').lower() == expected71def test_pack_ignored():72    output = serialize_message(Message.IGNORED)73    expected = """0 2 b0 7e 0 0""".replace(' ', '').lower()74    output = get_hexbytes(output.getvalue())75    assert output.replace('0x', '').lower() == expected76def test_pack_reset():77    output = serialize_message(Message.RESET)78    expected = """0 2 b0 f 0 0""".replace(' ', '').lower()79    output = get_hexbytes(output.getvalue())80    assert output.replace('0x', '').lower() == expected81def test_pack_ack_failure():82    output = serialize_message(Message.ACK_FAILURE)83    expected = """0 2 B0 E  0 0""".replace(' ', '').lower()84    output = get_hexbytes(output.getvalue())85    assert output.replace('0x', '').lower() == expected86def test_pack_record():87    output = serialize_message(Message.RECORD, params=([1],))88    expected = """0 4 b1 71  91 1 0 0""".replace(' ', '').lower()89    output = get_hexbytes(output.getvalue())90    assert output.replace('0x', '').lower() == expected91# Test chunker, discard all92def test_unpack_record_int(dummy_read_buffer_pair):93    dummy, buf = dummy_read_buffer_pair94    output = serialize_message(Message.RECORD, params=([1, 2, 3],)).getvalue()95    dummy.feed_data(output)96    record = deserialize_message(buf)97    assert record.fields == [1, 2, 3]98def test_unpack_record_int_sizes(dummy_read_buffer_pair):99    dummy, buf = dummy_read_buffer_pair100    output = serialize_message(Message.RECORD, params=([1, 1234, -1234, 40000, -40000],)).getvalue()101    dummy.feed_data(output)102    record = deserialize_message(buf)103    assert record.fields == [1, 1234, -1234, 40000, -40000]104def test_unpack_record_negint(dummy_read_buffer_pair):105    dummy, buf = dummy_read_buffer_pair106    output = serialize_message(Message.RECORD, params=([-1, -2, -3],)).getvalue()107    dummy.feed_data(output)108    record = deserialize_message(buf)109    assert record.fields == [-1, -2, -3]110def test_unpack_record_float(dummy_read_buffer_pair):111    dummy, buf = dummy_read_buffer_pair112    output = serialize_message(Message.RECORD, params=([1.1, 2.2, 3.3],)).getvalue()113    dummy.feed_data(output)114    record = deserialize_message(buf)115    assert record.fields == [1.1, 2.2, 3.3]116def test_unpack_record_string(dummy_read_buffer_pair):117    dummy, buf = dummy_read_buffer_pair118    output = serialize_message(Message.RECORD, params=(['a', 'b', 'c'],)).getvalue()119    dummy.feed_data(output)120    record = deserialize_message(buf)121    assert record.fields == ['a', 'b', 'c']122def test_unpack_record_dict(dummy_read_buffer_pair):123    dummy, buf = dummy_read_buffer_pair124    output = serialize_message(Message.RECORD, params=([{'a': 'b'}],)).getvalue()125    dummy.feed_data(output)126    record = deserialize_message(buf)127    assert record.fields == [{'a': 'b'}]128def test_unpack_record_mixed_types(dummy_read_buffer_pair):129    dummy, buf = dummy_read_buffer_pair130    output = serialize_message(Message.RECORD, params=([1, 'hello', {'hello': 'world'}],)).getvalue()131    dummy.feed_data(output)132    record = deserialize_message(buf)133    assert record.fields == [1, 'hello', {'hello': 'world'}]134def test_unpack_record_mixed_types_containers(dummy_read_buffer_pair):135    dummy, buf = dummy_read_buffer_pair136    output = serialize_message(137        Message.RECORD, params=([1, 'hello', {'hello': ['world', 'hello', None, True, False]}, [1, 2, 3]],)).getvalue()138    dummy.feed_data(output)139    record = deserialize_message(buf)140    assert record.fields == [1, 'hello', {'hello': ['world', 'hello', None, True, False]}, [1, 2, 3]]141def test_unpack_init_message(dummy_read_buffer_pair):142    dummy, buf = dummy_read_buffer_pair143    output = serialize_message(144        Message.INIT,145        params=("MyClient/1.0",146                collections.OrderedDict([("scheme", "basic"), ("principal", "neo4j"), ("credentials", "secret")]))).getvalue()147    dummy.feed_data(output)148    init = deserialize_message(buf)149    assert init.client_name == "MyClient/1.0"150    assert init.auth_token["scheme"] == "basic"151    assert init.auth_token["principal"] == "neo4j"152    assert init.auth_token["credentials"] == "secret"153def test_unpack_success_message(dummy_read_buffer_pair):154    dummy, buf = dummy_read_buffer_pair155    output = serialize_message(156        Message.SUCCESS,157        params=({"server": "Neo4j/3.1.0"},)).getvalue()158    dummy.feed_data(output)159    success = deserialize_message(buf)160    assert success.metadata == {"server": "Neo4j/3.1.0"}161def test_unpack_good_run_message(dummy_read_buffer_pair):162    dummy, buf = dummy_read_buffer_pair163    output = serialize_message(164        Message.RUN,165        params=("RETURN 1 AS num", {})).getvalue()166    dummy.feed_data(output)167    run = deserialize_message(buf)168    assert run.statement == "RETURN 1 AS num"169    assert run.parameters == {}170def test_unpack_good_run_message_with_args(dummy_read_buffer_pair):171    dummy, buf = dummy_read_buffer_pair172    output = serialize_message(173        Message.RUN,174        params=("RETURN 1 AS num", {'a': 1})).getvalue()175    dummy.feed_data(output)176    run = deserialize_message(buf)177    assert run.statement == "RETURN 1 AS num"178    assert run.parameters == {'a': 1}179def test_unpack_pull_all(dummy_read_buffer_pair):180    dummy, buf = dummy_read_buffer_pair181    output = serialize_message(Message.PULL_ALL).getvalue()182    dummy.feed_data(output)183    pull_all = deserialize_message(buf)...protocol.py
Source:protocol.py  
...72    elif type == ClientMsgType.SUIT_CHANGE:73        roomid, suit = data.split(',')74        suit = Suits.from_string(suit)75        return Message(type, {"suit": suit, "roomid":roomid})76def serialize_message(type: str, data: str) -> str:77    return f'{type}.{data}$'78def serialize_client_msg(type: ClientMsgType, **kwargs) -> str:79    if type == ClientMsgType.EST_CONN:80        return serialize_message(type.value, '')81    elif type == ClientMsgType.ACK_CONN:82        return serialize_message(type.value, kwargs['userid'])83    elif type == ClientMsgType.NEW_ROOM:84        return serialize_message(type.value, kwargs['rounds'])85    elif type == ClientMsgType.JOIN_ROOM or type == ClientMsgType.START_GAME or type == ClientMsgType.CLOSE_ROOM:86        data = f'{kwargs["userid"]},{kwargs["roomid"]}'87        return serialize_message(type.value, data)88    elif type == ClientMsgType.GAME_MOVE:89        card = kwargs["card"].serialize()90        data = f'{kwargs["roomid"]},{card}'91        return serialize_message(type.value, data)92    elif type == ClientMsgType.GET_CARD_STACK:93        return serialize_message(type.value, kwargs["roomid"])94    elif type == ClientMsgType.DISCONNECT:95        return serialize_message(type.value, kwargs["userid"])96    elif type == ClientMsgType.SUIT_CHANGE:97        data = f'{kwargs["roomid"]},{kwargs["suit"].value}'98        return serialize_message(type.value, data)99def serialize_server_msg(type: ServerMsgType, **kwargs) -> str:100    if type == ServerMsgType.ACK_CONN:101        return serialize_message(type.value, '')102    elif type == ServerMsgType.USER_CREATED:103        return serialize_message(type.value, kwargs['userid'])104    elif type == ServerMsgType.ROOM_CREATED or type == ServerMsgType.JOINED_ROOM:105        return serialize_message(type.value, kwargs['roomid'])106    elif type == ServerMsgType.GAME_STARTED:107        deck = kwargs['deck'].serialize()108        card = kwargs['current_card'].serialize()109        data = f'{kwargs["roomid"]},{deck},{card}'110        return serialize_message(type.value, data)111    elif type == ServerMsgType.YOUR_TURN or type == ServerMsgType.NEW_GAME_MOVE or type == ServerMsgType.STACK_CARD:112        card = kwargs['card'].serialize()113        return serialize_message(type.value, card)114    elif type == ServerMsgType.GAME_FINISHED or type == ServerMsgType.ROOM_WINNER:115        return serialize_message(type.value, kwargs['winner'])116    elif type == ServerMsgType.ROOM_CLOSED:117        pass118    elif type == ServerMsgType.ERROR:119        data = f"{kwargs['code']},{kwargs['description']}"120        return serialize_message(type.value, data)121    elif type == ServerMsgType.SUIT_CHANGE:122        return serialize_message(type.value, kwargs['suit'].value)123    elif type == ServerMsgType.SUIT_NEEDS_CHANGE:124        return serialize_message(type.value, '')125def get_raw(data: str) -> List[str]:126    data = data.split('$')127    data.remove('')128    return data129def parse_individual(message: str):130    msg_type, msg_data = get_type_and_data(message)131    if isinstance(msg_type, ServerMsgType):132        return parse_server_msg(msg_type, msg_data)133    elif isinstance(msg_type, ClientMsgType):134        return parse_client_msg(msg_type, msg_data)135def parse_message(data: str):136    return [ parse_individual(message) for message in get_raw(data) ]137def serialize(message: Message):138    if isinstance(message.type, ServerMsgType):...py_walk.py
Source:py_walk.py  
...12class PyWalk:13    def __init__(self, namespace="", parameters: [Parameter]=[]):14        serialized_parameters = []15        for parameter in parameters:16            serialized_parameters.append(serialize_message(parameter))17            if parameter.value.type == 2:18                print(f"Gave parameter {parameter.name} of integer type. If the code crashes it is maybe because this "19                      f"should be a float. You may need to add an .0 in some yaml file.")20        self.py_walk_wrapper = PyWalkWrapper(namespace, serialized_parameters)21    def spin_ros(self):22        self.py_walk_wrapper.spin_some()23    def reset(self):24        self.py_walk_wrapper.reset()25    def special_reset(self, state: String, phase: float, cmd_vel_msg: Twist, reset_odometry: bool):26        state_dict = {"PAUSED": 0, "WALKING": 1, "IDLE": 2, "START_MOVEMENT": 3, "STOP_MOVEMENT": 4, "START_STEP": 5,27                      "STOP_STEP": 6, "KICK": 7}28        self.py_walk_wrapper.special_reset(state_dict[state], phase, serialize_message(cmd_vel_msg), reset_odometry)29    def step(self, dt: float, cmdvel_msg: Twist, imu_msg, jointstate_msg, pressure_left, pressure_right):30        if dt == 0.0:31            # preventing weird spline interpolation errors on edge case32            dt = 0.00133        stepi = self.py_walk_wrapper.step(34            dt,35            serialize_message(cmdvel_msg),36            serialize_message(imu_msg),37            serialize_message(jointstate_msg),38            serialize_message(pressure_left),39            serialize_message(pressure_right))40        result = deserialize_message(stepi, JointCommand)41        return result42    def step_relative(self, dt: float, step_msg: Twist, imu_msg, jointstate_msg, pressure_left, pressure_right):43        if dt == 0.0:44            # preventing weird spline interpolation errors on edge case45            dt = 0.00146        stepi = self.py_walk_wrapper.step_relative(47            dt,48            serialize_message(step_msg),49            serialize_message(imu_msg),50            serialize_message(jointstate_msg),51            serialize_message(pressure_left),52            serialize_message(pressure_right))53        result = deserialize_message(stepi, JointCommand)54        return result55    def step_open_loop(self, dt: float, cmdvel_msg: Twist):56        if dt == 0.0:57            # preventing weird spline interpolation errors on edge case58            dt = 0.00159        stepi = self.py_walk_wrapper.step_open_loop(dt, serialize_message(cmdvel_msg))60        result = deserialize_message(stepi, PoseArray)61        return result62    def get_left_foot_pose(self):63        foot_pose = self.py_walk_wrapper.get_left_foot_pose()64        result = deserialize_message(foot_pose, Pose)65        return result66    def get_right_foot_pose(self):67        foot_pose = self.py_walk_wrapper.get_right_foot_pose()68        result = deserialize_message(foot_pose, Pose)69        return result70    def set_parameters(self, param_dict):71        parameters = parse_parameter_dict(namespace="", parameter_dict=param_dict)72        for parameter in parameters:73            self.py_walk_wrapper.set_parameter(serialize_message(parameter))74    def get_phase(self):75        return self.py_walk_wrapper.get_phase()76    def get_freq(self):77        return self.py_walk_wrapper.get_freq()78    def get_odom(self):79        odom = self.py_walk_wrapper.get_odom()80        result = deserialize_message(odom, Odometry)81        return result82    def publish_debug(self):83        self.py_walk_wrapper.publish_debug()84    def test_memory_leak_from(self, twist_msg):85        self.py_walk_wrapper.test_memory_leak_from(serialize_message(twist_msg))86    def test_memory_leak_to(self):87        self.py_walk_wrapper.test_memory_leak_to()88    def test_memory_leak_methods(self, msg):...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
