Best Python code snippet using playwright-python
test_queue_lifecycle.py
Source:test_queue_lifecycle.py  
...41            resp = json.loads(resp.decode())42            self.assertEqual(400, resp['headers']['status'])43        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:44            msg_mock.side_effect = validator45            self.protocol.onMessage(req, False)46    @ddt.data('480924', 'foo')47    def test_basics_thoroughly(self, project_id):48        # Stats are empty - queue not created yet49        action = consts.QUEUE_GET_STATS50        body = {"queue_name": "gummybears"}51        headers = {52            'Client-ID': uuidutils.generate_uuid(),53            'X-Project-ID': project_id54        }55        send_mock = mock.patch.object(self.protocol, 'sendMessage')56        self.addCleanup(send_mock.stop)57        sender = send_mock.start()58        req = test_utils.create_request(action, body, headers)59        def validator(resp, isBinary):60            resp = json.loads(resp.decode())61            self.assertEqual(404, resp['headers']['status'])62        sender.side_effect = validator63        self.protocol.onMessage(req, False)64        # Create65        action = consts.QUEUE_CREATE66        body = {"queue_name": "gummybears",67                "metadata": {68                    "key": {69                        "key2": "value",70                        "key3": [1, 2, 3, 4, 5]},71                    "messages": {"ttl": 600},72                    }73                }74        req = test_utils.create_request(action, body, headers)75        def validator(resp, isBinary):76            resp = json.loads(resp.decode())77            self.assertEqual(201, resp['headers']['status'])78        sender.side_effect = validator79        self.protocol.onMessage(req, False)80        # Fetch metadata81        action = consts.QUEUE_GET82        body = {"queue_name": "gummybears"}83        meta = {"messages": {"ttl": 600},84                "key": {85                    "key2": "value",86                    "key3": [1, 2, 3, 4, 5]}87                }88        req = test_utils.create_request(action, body, headers)89        def validator(resp, isBinary):90            resp = json.loads(resp.decode())91            self.assertEqual(200, resp['headers']['status'])92            self.assertEqual(meta, resp['body'])93        sender.side_effect = validator94        self.protocol.onMessage(req, False)95        # Stats empty queue96        action = consts.QUEUE_GET_STATS97        body = {"queue_name": "gummybears"}98        req = test_utils.create_request(action, body, headers)99        def validator(resp, isBinary):100            resp = json.loads(resp.decode())101            self.assertEqual(200, resp['headers']['status'])102        sender.side_effect = validator103        self.protocol.onMessage(req, False)104        # Delete105        action = consts.QUEUE_DELETE106        body = {"queue_name": "gummybears"}107        req = test_utils.create_request(action, body, headers)108        def validator(resp, isBinary):109            resp = json.loads(resp.decode())110            self.assertEqual(204, resp['headers']['status'])111        sender.side_effect = validator112        self.protocol.onMessage(req, False)113        # Get non-existent stats114        action = consts.QUEUE_GET_STATS115        body = {"queue_name": "gummybears"}116        req = test_utils.create_request(action, body, headers)117        def validator(resp, isBinary):118            resp = json.loads(resp.decode())119            self.assertEqual(404, resp['headers']['status'])120        sender.side_effect = validator121        self.protocol.onMessage(req, False)122    def test_name_restrictions(self):123        headers = {124            'Client-ID': uuidutils.generate_uuid(),125            'X-Project-ID': 'test-project'126        }127        action = consts.QUEUE_CREATE128        body = {"queue_name": 'marsbar',129                "metadata": {130                    "key": {131                        "key2": "value",132                        "key3": [1, 2, 3, 4, 5]},133                    "messages": {"ttl": 600},134                    }135                }136        send_mock = mock.patch.object(self.protocol, 'sendMessage')137        self.addCleanup(send_mock.stop)138        sender = send_mock.start()139        req = test_utils.create_request(action, body, headers)140        def validator(resp, isBinary):141            resp = json.loads(resp.decode())142            self.assertIn(resp['headers']['status'], [201, 204])143        sender.side_effect = validator144        self.protocol.onMessage(req, False)145        body["queue_name"] = "m@rsb@r"146        req = test_utils.create_request(action, body, headers)147        def validator(resp, isBinary):148            resp = json.loads(resp.decode())149            self.assertEqual(400, resp['headers']['status'])150        sender.side_effect = validator151        self.protocol.onMessage(req, False)152        body["queue_name"] = "marsbar" * 10153        req = test_utils.create_request(action, body, headers)154        self.protocol.onMessage(req, False)155    def test_project_id_restriction(self):156        headers = {157            'Client-ID': uuidutils.generate_uuid(),158            'X-Project-ID': 'test-project' * 30159        }160        action = consts.QUEUE_CREATE161        body = {"queue_name": 'poptart'}162        send_mock = mock.patch.object(self.protocol, 'sendMessage')163        self.addCleanup(send_mock.stop)164        sender = send_mock.start()165        req = test_utils.create_request(action, body, headers)166        def validator(resp, isBinary):167            resp = json.loads(resp.decode())168            self.assertEqual(400, resp['headers']['status'])169        sender.side_effect = validator170        self.protocol.onMessage(req, False)171        headers['X-Project-ID'] = 'test-project'172        req = test_utils.create_request(action, body, headers)173        def validator(resp, isBinary):174            resp = json.loads(resp.decode())175            self.assertIn(resp['headers']['status'], [201, 204])176        sender.side_effect = validator177        self.protocol.onMessage(req, False)178    def test_non_ascii_name(self):179        test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'),180                       (u'/queues/non-ascii-n\xc4me', 'iso8859-1'))181        headers = {182            'Client-ID': uuidutils.generate_uuid(),183            'X-Project-ID': 'test-project' * 30184        }185        action = consts.QUEUE_CREATE186        body = {"queue_name": test_params[0]}187        send_mock = mock.patch.object(self.protocol, 'sendMessage')188        self.addCleanup(send_mock.stop)189        sender = send_mock.start()190        req = test_utils.create_request(action, body, headers)191        def validator(resp, isBinary):192            resp = json.loads(resp.decode())193            self.assertEqual(400, resp['headers']['status'])194        sender.side_effect = validator195        self.protocol.onMessage(req, False)196        body = {"queue_name": test_params[1]}197        req = test_utils.create_request(action, body, headers)198        self.protocol.onMessage(req, False)199    def test_no_metadata(self):200        headers = {201            'Client-ID': uuidutils.generate_uuid(),202            'X-Project-ID': 'test-project'203        }204        action = consts.QUEUE_CREATE205        body = {"queue_name": "fizbat"}206        send_mock = mock.patch.object(self.protocol, 'sendMessage')207        self.addCleanup(send_mock.stop)208        sender = send_mock.start()209        req = test_utils.create_request(action, body, headers)210        def validator(resp, isBinary):211            resp = json.loads(resp.decode())212            self.assertIn(resp['headers']['status'], [201, 204])213        sender.side_effect = validator214        self.protocol.onMessage(req, False)215        def validator(resp, isBinary):216            resp = json.loads(resp.decode())217            self.assertEqual(204, resp['headers']['status'])218        sender.side_effect = validator219        self.protocol.onMessage(req, False)220    @ddt.data('{', '[]', '.', '  ')221    def test_bad_metadata(self, meta):222        headers = {223            'Client-ID': uuidutils.generate_uuid(),224            'X-Project-ID': 'test-project' * 30225        }226        action = consts.QUEUE_CREATE227        body = {"queue_name": "fizbat",228                "metadata": meta}229        send_mock = mock.patch.object(self.protocol, 'sendMessage')230        self.addCleanup(send_mock.stop)231        sender = send_mock.start()232        req = test_utils.create_request(action, body, headers)233        def validator(resp, isBinary):234            resp = json.loads(resp.decode())235            self.assertEqual(400, resp['headers']['status'])236        sender.side_effect = validator237        self.protocol.onMessage(req, False)238    def test_too_much_metadata(self):239        headers = {240            'Client-ID': uuidutils.generate_uuid(),241            'X-Project-ID': 'test-project'242        }243        action = consts.QUEUE_CREATE244        body = {"queue_name": "buttertoffee",245                "metadata": {"messages": {"ttl": 600},246                             "padding": "x"}247                }248        max_size = self.transport_cfg.max_queue_metadata249        body["metadata"]["padding"] = "x" * max_size250        send_mock = mock.patch.object(self.protocol, 'sendMessage')251        self.addCleanup(send_mock.stop)252        sender = send_mock.start()253        req = test_utils.create_request(action, body, headers)254        def validator(resp, isBinary):255            resp = json.loads(resp.decode())256            self.assertEqual(400, resp['headers']['status'])257        sender.side_effect = validator258        self.protocol.onMessage(req, False)259    def test_way_too_much_metadata(self):260        headers = {261            'Client-ID': uuidutils.generate_uuid(),262            'X-Project-ID': 'test-project'263        }264        action = consts.QUEUE_CREATE265        body = {"queue_name": "peppermint",266                "metadata": {"messages": {"ttl": 600},267                             "padding": "x"}268                }269        max_size = self.transport_cfg.max_queue_metadata270        body["metadata"]["padding"] = "x" * max_size * 5271        send_mock = mock.patch.object(self.protocol, 'sendMessage')272        self.addCleanup(send_mock.stop)273        sender = send_mock.start()274        req = test_utils.create_request(action, body, headers)275        def validator(resp, isBinary):276            resp = json.loads(resp.decode())277            self.assertEqual(400, resp['headers']['status'])278        sender.side_effect = validator279        self.protocol.onMessage(req, False)280    def test_update_metadata(self):281        self.skip("Implement patch method")282        headers = {283            'Client-ID': uuidutils.generate_uuid(),284            'X-Project-ID': 'test-project'285        }286        action = consts.QUEUE_CREATE287        body = {"queue_name": "bonobon"}288        send_mock = mock.patch.object(self.protocol, 'sendMessage')289        self.addCleanup(send_mock.stop)290        sender = send_mock.start()291        # Create292        req = test_utils.create_request(action, body, headers)293        def validator(resp, isBinary):294            resp = json.loads(resp.decode())295            self.assertEqual(201, resp['headers']['status'])296        sender.side_effect = validator297        self.protocol.onMessage(req, False)298        # Set meta299        meta1 = {"messages": {"ttl": 600}, "padding": "x"}300        body["metadata"] = meta1301        req = test_utils.create_request(action, body, headers)302        def validator(resp, isBinary):303            resp = json.loads(resp.decode())304            self.assertEqual(204, resp['headers']['status'])305        sender.side_effect = validator306        self.protocol.onMessage(req, False)307        # Get308        action = consts.QUEUE_GET309        body = {"queue_name": "bonobon"}310        req = test_utils.create_request(action, body, headers)311        def validator(resp, isBinary):312            resp = json.loads(resp.decode())313            self.assertEqual(204, resp['headers']['status'])314            self.assertEqual(meta1, resp['body'])315        sender.side_effect = validator316        self.protocol.onMessage(req, False)317        # Update318        action = consts.QUEUE_CREATE319        meta2 = {"messages": {"ttl": 100}, "padding": "y"}320        body["metadata"] = meta2321        req = test_utils.create_request(action, body, headers)322        def validator(resp, isBinary):323            resp = json.loads(resp.decode())324            self.assertEqual(204, resp['headers']['status'])325        sender.side_effect = validator326        self.protocol.onMessage(req, False)327        # Get again328        action = consts.QUEUE_GET329        body = {"queue_name": "bonobon"}330        req = test_utils.create_request(action, body, headers)331        def validator(resp, isBinary):332            resp = json.loads(resp.decode())333            self.assertEqual(200, resp['headers']['status'])334            self.assertEqual(meta2, resp['body'])335        sender.side_effect = validator336        self.protocol.onMessage(req, False)337    def test_list(self):338        arbitrary_number = 644079696574693339        project_id = str(arbitrary_number)340        client_id = uuidutils.generate_uuid()341        headers = {342            'X-Project-ID': project_id,343            'Client-ID': client_id344        }345        send_mock = mock.patch.object(self.protocol, 'sendMessage')346        self.addCleanup(send_mock.stop)347        sender = send_mock.start()348        # NOTE(kgriffs): It's important that this one sort after the one349        # above. This is in order to prove that bug/1236605 is fixed, and350        # stays fixed!351        # NOTE(vkmc): In websockets as well!352        alt_project_id = str(arbitrary_number + 1)353        # List empty354        action = consts.QUEUE_LIST355        body = {}356        req = test_utils.create_request(action, body, headers)357        def validator(resp, isBinary):358            resp = json.loads(resp.decode())359            self.assertEqual(200, resp['headers']['status'])360            self.assertEqual([], resp['body']['queues'])361        sender.side_effect = validator362        self.protocol.onMessage(req, False)363        # Payload exceeded364        body = {'limit': 21}365        req = test_utils.create_request(action, body, headers)366        def validator(resp, isBinary):367            resp = json.loads(resp.decode())368            self.assertEqual(400, resp['headers']['status'])369        sender.side_effect = validator370        self.protocol.onMessage(req, False)371        # Create some372        def create_queue(project_id, queue_name, metadata):373            altheaders = {'Client-ID': client_id}374            if project_id is not None:375                altheaders['X-Project-ID'] = project_id376            action = consts.QUEUE_CREATE377            body['queue_name'] = queue_name378            body['metadata'] = metadata379            req = test_utils.create_request(action, body, altheaders)380            def validator(resp, isBinary):381                resp = json.loads(resp.decode())382                self.assertEqual(201, resp['headers']['status'])383            sender.side_effect = validator384            self.protocol.onMessage(req, False)385        create_queue(project_id, 'q1', {"node": 31})386        create_queue(project_id, 'q2', {"node": 32})387        create_queue(project_id, 'q3', {"node": 33})388        create_queue(alt_project_id, 'q3', {"alt": 1})389        # List (limit)390        body = {'limit': 2}391        req = test_utils.create_request(action, body, headers)392        def validator(resp, isBinary):393            resp = json.loads(resp.decode())394            self.assertEqual(2, len(resp['body']['queues']))395        sender.side_effect = validator396        self.protocol.onMessage(req, False)397        # List (no metadata, get all)398        body = {'limit': 5}399        req = test_utils.create_request(action, body, headers)400        def validator(resp, isBinary):401            resp = json.loads(resp.decode())402            self.assertEqual(200, resp['headers']['status'])403            # Ensure we didn't pick up the queue from the alt project.404            self.assertEqual(3, len(resp['body']['queues']))405        sender.side_effect = validator406        self.protocol.onMessage(req, False)407        # List with metadata408        body = {'detailed': True}409        req = test_utils.create_request(action, body, headers)410        def validator(resp, isBinary):411            resp = json.loads(resp.decode())412            self.assertEqual(200, resp['headers']['status'])413        sender.side_effect = validator414        self.protocol.onMessage(req, False)415        action = consts.QUEUE_GET416        body = {"queue_name": "q1"}417        req = test_utils.create_request(action, body, headers)418        def validator(resp, isBinary):419            resp = json.loads(resp.decode())420            self.assertEqual(200, resp['headers']['status'])421            self.assertEqual({"node": 31}, resp['body'])422        sender.side_effect = validator423        self.protocol.onMessage(req, False)424        # List tail425        action = consts.QUEUE_LIST426        body = {}427        req = test_utils.create_request(action, body, headers)428        def validator(resp, isBinary):429            resp = json.loads(resp.decode())430            self.assertEqual(200, resp['headers']['status'])431        sender.side_effect = validator432        self.protocol.onMessage(req, False)433        # List manually-constructed tail434        body = {'marker': "zzz"}435        req = test_utils.create_request(action, body, headers)436        self.protocol.onMessage(req, False)437    def test_list_returns_503_on_nopoolfound_exception(self):438        headers = {439            'Client-ID': uuidutils.generate_uuid(),440            'X-Project-ID': 'test-project'441        }442        action = consts.QUEUE_LIST443        body = {}444        send_mock = mock.patch.object(self.protocol, 'sendMessage')445        self.addCleanup(send_mock.stop)446        sender = send_mock.start()447        req = test_utils.create_request(action, body, headers)448        def validator(resp, isBinary):449            resp = json.loads(resp.decode())450            self.assertEqual(503, resp['headers']['status'])451        sender.side_effect = validator452        queue_controller = self.boot.storage.queue_controller453        with mock.patch.object(queue_controller, 'list') as mock_queue_list:454            def queue_generator():455                raise storage_errors.NoPoolFound()456            # This generator tries to be like queue controller list generator457            # in some ways.458            def fake_generator():459                yield queue_generator()460                yield {}461            mock_queue_list.return_value = fake_generator()462            self.protocol.onMessage(req, False)463    def _post_messages(self, queue_name, headers, repeat=1):464        messages = [{'body': 239, 'ttl': 300}] * repeat465        action = consts.MESSAGE_POST466        body = {"queue_name": queue_name,467                "messages": messages}468        send_mock = mock.Mock()469        self.protocol.sendMessage = send_mock470        req = test_utils.create_request(action, body, headers)471        self.protocol.onMessage(req, False)472        return json.loads(send_mock.call_args[0][0].decode())473    def test_purge(self):474        arbitrary_number = 644079696574693475        project_id = str(arbitrary_number)476        client_id = uuidutils.generate_uuid()477        headers = {478            'X-Project-ID': project_id,479            'Client-ID': client_id480        }481        queue_name = 'myqueue'482        resp = self._post_messages(queue_name, headers, repeat=5)483        msg_ids = resp['body']['message_ids']484        send_mock = mock.Mock()485        self.protocol.sendMessage = send_mock486        for msg_id in msg_ids:487            action = consts.MESSAGE_GET488            body = {"queue_name": queue_name, "message_id": msg_id}489            req = test_utils.create_request(action, body, headers)490            self.protocol.onMessage(req, False)491            resp = json.loads(send_mock.call_args[0][0].decode())492            self.assertEqual(200, resp['headers']['status'])493        action = consts.QUEUE_PURGE494        body = {"queue_name": queue_name, "resource_types": ["messages"]}495        req = test_utils.create_request(action, body, headers)496        self.protocol.onMessage(req, False)497        resp = json.loads(send_mock.call_args[0][0].decode())498        self.assertEqual(204, resp['headers']['status'])499        for msg_id in msg_ids:500            action = consts.MESSAGE_GET501            body = {"queue_name": queue_name, "message_id": msg_id}502            req = test_utils.create_request(action, body, headers)503            self.protocol.onMessage(req, False)504            resp = json.loads(send_mock.call_args[0][0].decode())505            self.assertEqual(404, resp['headers']['status'])506class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):507    config_file = 'websocket_mongodb.conf'508    @testing.requires_mongodb509    def setUp(self):510        super(TestQueueLifecycleMongoDB, self).setUp()511    def tearDown(self):512        storage = self.boot.storage._storage513        connection = storage.connection514        connection.drop_database(self.boot.control.queues_database)515        for db in storage.message_databases:516            connection.drop_database(db)517        super(TestQueueLifecycleMongoDB, self).tearDown()test_messages.py
Source:test_messages.py  
...39        body = {"queue_name": "kitkat"}40        req = test_utils.create_request(consts.QUEUE_CREATE,41                                        body, self.headers)42        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:43            self.protocol.onMessage(req, False)44            resp = json.loads(msg_mock.call_args[0][0].decode())45            self.assertIn(resp['headers']['status'], [201, 204])46    def tearDown(self):47        super(MessagesBaseTest, self).tearDown()48        body = {"queue_name": "kitkat"}49        send_mock = mock.Mock()50        self.protocol.sendMessage = send_mock51        req = test_utils.create_request(consts.QUEUE_DELETE,52                                        body, self.headers)53        self.protocol.onMessage(req, False)54        resp = json.loads(send_mock.call_args[0][0].decode())55        self.assertEqual(204, resp['headers']['status'])56    def _test_post(self, sample_messages, in_binary=False):57        body = {"queue_name": "kitkat",58                "messages": sample_messages}59        send_mock = mock.Mock()60        self.protocol.sendMessage = send_mock61        dumps, loads, create_req = test_utils.get_pack_tools(binary=in_binary)62        req = create_req(consts.MESSAGE_POST, body, self.headers)63        self.protocol.onMessage(req, in_binary)64        arg = send_mock.call_args[0][0]65        if not in_binary:66            arg = arg.decode()67        resp = loads(arg)68        self.assertEqual(201, resp['headers']['status'])69        self.msg_ids = resp['body']['message_ids']70        self.assertEqual(len(sample_messages), len(self.msg_ids))71        lookup = dict([(m['ttl'], m['body']) for m in sample_messages])72        # Test GET on the message resource directly73        # NOTE(cpp-cabrera): force the passing of time to age a message74        timeutils_utcnow = 'oslo_utils.timeutils.utcnow'75        now = timeutils.utcnow() + datetime.timedelta(seconds=10)76        with mock.patch(timeutils_utcnow) as mock_utcnow:77            mock_utcnow.return_value = now78            for msg_id in self.msg_ids:79                headers = self.headers.copy()80                headers['X-Project-ID'] = '777777'81                # Wrong project ID82                action = consts.MESSAGE_GET83                body = {"queue_name": "kitkat",84                        "message_id": msg_id}85                req = create_req(action, body, headers)86                self.protocol.onMessage(req, in_binary)87                arg = send_mock.call_args[0][0]88                if not in_binary:89                    arg = arg.decode()90                resp = loads(arg)91                self.assertEqual(404, resp['headers']['status'])92                # Correct project ID93                req = create_req(action, body, self.headers)94                self.protocol.onMessage(req, in_binary)95                arg = send_mock.call_args[0][0]96                if not in_binary:97                    arg = arg.decode()98                resp = loads(arg)99                self.assertEqual(200, resp['headers']['status'])100                # Check message properties101                message = resp['body']['messages']102                self.assertEqual(lookup[message['ttl']], message['body'])103                self.assertEqual(msg_id, message['id'])104                # no negative age105                # NOTE(cpp-cabrera): testtools lacks106                # GreaterThanEqual on py26107                self.assertThat(message['age'],108                                matchers.GreaterThan(-1))109        # Test bulk GET110        action = consts.MESSAGE_GET_MANY111        body = {"queue_name": "kitkat",112                "message_ids": self.msg_ids}113        req = create_req(action, body, self.headers)114        self.protocol.onMessage(req, in_binary)115        arg = send_mock.call_args[0][0]116        if not in_binary:117            arg = arg.decode()118        resp = loads(arg)119        self.assertEqual(200, resp['headers']['status'])120        expected_ttls = set(m['ttl'] for m in sample_messages)121        actual_ttls = set(m['ttl'] for m in resp['body']['messages'])122        self.assertFalse(expected_ttls - actual_ttls)123        actual_ids = set(m['id'] for m in resp['body']['messages'])124        self.assertFalse(set(self.msg_ids) - actual_ids)125    def test_exceeded_payloads(self):126        # Get a valid message id127        resp = self._post_messages("kitkat")128        msg_id = resp['body']['message_ids']129        # Bulk GET restriction130        get_msg_ids = msg_id * 21131        action = consts.MESSAGE_GET_MANY132        body = {"queue_name": "kitkat",133                "message_ids": get_msg_ids}134        send_mock = mock.Mock()135        self.protocol.sendMessage = send_mock136        req = test_utils.create_request(action, body, self.headers)137        self.protocol.onMessage(req, False)138        resp = json.loads(send_mock.call_args[0][0].decode())139        self.assertEqual(400, resp['headers']['status'])140        # Listing restriction141        body['limit'] = 21142        req = test_utils.create_request(action, body, self.headers)143        self.protocol.onMessage(req, False)144        resp = json.loads(send_mock.call_args[0][0].decode())145        self.assertEqual(400, resp['headers']['status'])146        # Bulk deletion restriction147        del_msg_ids = msg_id * 22148        action = consts.MESSAGE_GET_MANY149        body = {"queue_name": "kitkat",150                "message_ids": del_msg_ids}151        req = test_utils.create_request(action, body, self.headers)152        self.protocol.onMessage(req, False)153        resp = json.loads(send_mock.call_args[0][0].decode())154        self.assertEqual(400, resp['headers']['status'])155    @ddt.data(True, False)156    def test_post_single(self, in_binary):157        sample_messages = [158            {'body': {'key': 'value'}, 'ttl': 200},159        ]160        self._test_post(sample_messages, in_binary=in_binary)161    @ddt.data(True, False)162    def test_post_multiple(self, in_binary):163        sample_messages = [164            {'body': 239, 'ttl': 100},165            {'body': {'key': 'value'}, 'ttl': 200},166            {'body': [1, 3], 'ttl': 300},167        ]168        self._test_post(sample_messages, in_binary=in_binary)169    def test_post_optional_ttl(self):170        messages = [{'body': 239},171                    {'body': {'key': 'value'}, 'ttl': 200}]172        action = consts.MESSAGE_POST173        body = {"queue_name": "kitkat",174                "messages": messages}175        req = test_utils.create_request(action, body, self.headers)176        send_mock = mock.Mock()177        self.protocol.sendMessage = send_mock178        self.protocol.onMessage(req, False)179        resp = json.loads(send_mock.call_args[0][0].decode())180        self.assertEqual(201, resp['headers']['status'])181        msg_id = resp['body']['message_ids'][0]182        action = consts.MESSAGE_GET183        body = {"queue_name": "kitkat", "message_id": msg_id}184        req = test_utils.create_request(action, body, self.headers)185        self.protocol.onMessage(req, False)186        resp = json.loads(send_mock.call_args[0][0].decode())187        self.assertEqual(200, resp['headers']['status'])188        self.assertEqual(self.default_message_ttl,189                         resp['body']['messages']['ttl'])190    def test_post_to_non_ascii_queue(self):191        queue_name = u'non-ascii-n\u0153me'192        if six.PY2:193            queue_name = queue_name.encode('utf-8')194        resp = self._post_messages(queue_name)195        self.assertEqual(400, resp['headers']['status'])196    def test_post_with_long_queue_name(self):197        # NOTE(kgriffs): This test verifies that routes with198        # embedded queue name params go through the validation199        # hook, regardless of the target resource.200        queue_name = 'v' * validation.QUEUE_NAME_MAX_LEN201        resp = self._post_messages(queue_name)202        self.assertEqual(201, resp['headers']['status'])203        queue_name += 'v'204        resp = self._post_messages(queue_name)205        self.assertEqual(400, resp['headers']['status'])206    def test_post_to_missing_queue(self):207        queue_name = 'nonexistent'208        resp = self._post_messages(queue_name)209        self.assertEqual(201, resp['headers']['status'])210    def test_post_invalid_ttl(self):211        sample_messages = [212            {'body': {'key': 'value'}, 'ttl': '200'},213        ]214        action = consts.MESSAGE_POST215        body = {"queue_name": "kitkat",216                "messages": sample_messages}217        send_mock = mock.patch.object(self.protocol, 'sendMessage')218        self.addCleanup(send_mock.stop)219        send_mock = send_mock.start()220        req = test_utils.create_request(action, body, self.headers)221        self.protocol.onMessage(req, False)222        resp = json.loads(send_mock.call_args[0][0].decode())223        self.assertEqual(400, resp['headers']['status'])224        self.assertEqual(225            'Bad request. The value of the "ttl" field must be a int.',226            resp['body']['exception'])227    def test_post_no_body(self):228        sample_messages = [229            {'ttl': 200},230        ]231        action = consts.MESSAGE_POST232        body = {"queue_name": "kitkat",233                "messages": sample_messages}234        send_mock = mock.patch.object(self.protocol, 'sendMessage')235        self.addCleanup(send_mock.stop)236        send_mock = send_mock.start()237        req = test_utils.create_request(action, body, self.headers)238        self.protocol.onMessage(req, False)239        resp = json.loads(send_mock.call_args[0][0].decode())240        self.assertEqual(400, resp['headers']['status'])241        self.assertEqual(242            'Bad request. Missing "body" field.', resp['body']['exception'])243    def test_get_from_missing_queue(self):244        action = consts.MESSAGE_LIST245        body = {"queue_name": "anothernonexistent"}246        req = test_utils.create_request(action, body, self.headers)247        send_mock = mock.Mock()248        self.protocol.sendMessage = send_mock249        self.protocol.onMessage(req, False)250        resp = json.loads(send_mock.call_args[0][0].decode())251        self.assertEqual(200, resp['headers']['status'])252        self.assertEqual([], resp['body']['messages'])253    @ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')254    def test_bad_client_id(self, text_id):255        action = consts.MESSAGE_POST256        body = {257            "queue_name": "kinder",258            "messages": [{"ttl": 60,259                          "body": ""}]260        }261        headers = {262            'Client-ID': text_id,263            'X-Project-ID': self.project_id264        }265        send_mock = mock.Mock()266        self.protocol.sendMessage = send_mock267        req = test_utils.create_request(action, body, headers)268        self.protocol.onMessage(req, False)269        resp = json.loads(send_mock.call_args[0][0].decode())270        self.assertEqual(400, resp['headers']['status'])271        action = consts.MESSAGE_GET272        body = {273            "queue_name": "kinder",274            "limit": 3,275            "echo": True276        }277        req = test_utils.create_request(action, body, headers)278        self.protocol.onMessage(req, False)279        resp = json.loads(send_mock.call_args[0][0].decode())280        self.assertEqual(400, resp['headers']['status'])281    @ddt.data(None, '[', '[]', '{}', '.')282    def test_post_bad_message(self, document):283        action = consts.MESSAGE_POST284        body = {285            "queue_name": "kinder",286            "messages": document287        }288        send_mock = mock.Mock()289        self.protocol.sendMessage = send_mock290        req = test_utils.create_request(action, body, self.headers)291        self.protocol.onMessage(req, False)292        resp = json.loads(send_mock.call_args[0][0].decode())293        self.assertEqual(400, resp['headers']['status'])294    @ddt.data(-1, 59, 1209601)295    def test_unacceptable_ttl(self, ttl):296        action = consts.MESSAGE_POST297        body = {"queue_name": "kinder",298                "messages": [{"ttl": ttl, "body": ""}]}299        send_mock = mock.Mock()300        self.protocol.sendMessage = send_mock301        req = test_utils.create_request(action, body, self.headers)302        self.protocol.onMessage(req, False)303        resp = json.loads(send_mock.call_args[0][0].decode())304        self.assertEqual(400, resp['headers']['status'])305    def test_exceeded_message_posting(self):306        # Total (raw request) size307        document = [{'body': "some body", 'ttl': 100}] * 8000308        action = consts.MESSAGE_POST309        body = {310            "queue_name": "kinder",311            "messages": document312        }313        send_mock = mock.Mock()314        self.protocol.sendMessage = send_mock315        req = test_utils.create_request(action, body, self.headers)316        self.protocol.onMessage(req, False)317        resp = json.loads(send_mock.call_args[0][0].decode())318        self.assertEqual(400, resp['headers']['status'])319    @ddt.data('{"overflow": 9223372036854775808}',320              '{"underflow": -9223372036854775809}')321    def test_unsupported_json(self, document):322        action = consts.MESSAGE_POST323        body = {324            "queue_name": "fizz",325            "messages": document326        }327        send_mock = mock.Mock()328        self.protocol.sendMessage = send_mock329        req = test_utils.create_request(action, body, self.headers)330        self.protocol.onMessage(req, False)331        resp = json.loads(send_mock.call_args[0][0].decode())332        self.assertEqual(400, resp['headers']['status'])333    def test_delete(self):334        resp = self._post_messages("tofi")335        msg_id = resp['body']['message_ids'][0]336        action = consts.MESSAGE_GET337        body = {"queue_name": "tofi",338                "message_id": msg_id}339        send_mock = mock.Mock()340        self.protocol.sendMessage = send_mock341        req = test_utils.create_request(action, body, self.headers)342        self.protocol.onMessage(req, False)343        resp = json.loads(send_mock.call_args[0][0].decode())344        self.assertEqual(200, resp['headers']['status'])345        # Delete queue346        action = consts.MESSAGE_DELETE347        req = test_utils.create_request(action, body, self.headers)348        self.protocol.onMessage(req, False)349        resp = json.loads(send_mock.call_args[0][0].decode())350        self.assertEqual(204, resp['headers']['status'])351        # Get non existent queue352        action = consts.MESSAGE_GET353        req = test_utils.create_request(action, body, self.headers)354        self.protocol.onMessage(req, False)355        resp = json.loads(send_mock.call_args[0][0].decode())356        self.assertEqual(404, resp['headers']['status'])357        # Safe to delete non-existing ones358        action = consts.MESSAGE_DELETE359        req = test_utils.create_request(action, body, self.headers)360        self.protocol.onMessage(req, False)361        resp = json.loads(send_mock.call_args[0][0].decode())362        self.assertEqual(204, resp['headers']['status'])363    def test_bulk_delete(self):364        resp = self._post_messages("nerds", repeat=5)365        msg_ids = resp['body']['message_ids']366        action = consts.MESSAGE_DELETE_MANY367        body = {"queue_name": "nerds",368                "message_ids": msg_ids}369        send_mock = mock.Mock()370        self.protocol.sendMessage = send_mock371        req = test_utils.create_request(action, body, self.headers)372        self.protocol.onMessage(req, False)373        resp = json.loads(send_mock.call_args[0][0].decode())374        self.assertEqual(204, resp['headers']['status'])375        action = consts.MESSAGE_GET376        req = test_utils.create_request(action, body, self.headers)377        self.protocol.onMessage(req, False)378        resp = json.loads(send_mock.call_args[0][0].decode())379        self.assertEqual(400, resp['headers']['status'])380        # Safe to delete non-existing ones381        action = consts.MESSAGE_DELETE_MANY382        req = test_utils.create_request(action, body, self.headers)383        self.protocol.onMessage(req, False)384        resp = json.loads(send_mock.call_args[0][0].decode())385        self.assertEqual(204, resp['headers']['status'])386        # Even after the queue is gone387        action = consts.QUEUE_DELETE388        body = {"queue_name": "nerds"}389        req = test_utils.create_request(action, body, self.headers)390        self.protocol.onMessage(req, False)391        resp = json.loads(send_mock.call_args[0][0].decode())392        self.assertEqual(204, resp['headers']['status'])393        action = consts.MESSAGE_DELETE_MANY394        body = {"queue_name": "nerds",395                "message_ids": msg_ids}396        req = test_utils.create_request(action, body, self.headers)397        self.protocol.onMessage(req, False)398        resp = json.loads(send_mock.call_args[0][0].decode())399        self.assertEqual(204, resp['headers']['status'])400    def test_pop_delete(self):401        self._post_messages("kitkat", repeat=5)402        action = consts.MESSAGE_DELETE_MANY403        body = {"queue_name": "kitkat", "pop": 2}404        send_mock = mock.Mock()405        self.protocol.sendMessage = send_mock406        req = test_utils.create_request(action, body, self.headers)407        self.protocol.onMessage(req, False)408        resp = json.loads(send_mock.call_args[0][0].decode())409        self.assertEqual(200, resp['headers']['status'])410        self.assertEqual(2, len(resp['body']['messages']))411        self.assertEqual(239, resp['body']['messages'][0]['body'])412        self.assertEqual(239, resp['body']['messages'][1]['body'])413    def test_get_nonexistent_message_404s(self):414        action = consts.MESSAGE_GET415        body = {"queue_name": "notthere",416                "message_id": "a"}417        send_mock = mock.Mock()418        self.protocol.sendMessage = send_mock419        req = test_utils.create_request(action, body, self.headers)420        self.protocol.onMessage(req, False)421        resp = json.loads(send_mock.call_args[0][0].decode())422        self.assertEqual(404, resp['headers']['status'])423    def test_get_multiple_invalid_messages_404s(self):424        action = consts.MESSAGE_GET_MANY425        body = {"queue_name": "notnotthere",426                "message_ids": ["a", "b", "c"]}427        send_mock = mock.Mock()428        self.protocol.sendMessage = send_mock429        req = test_utils.create_request(action, body, self.headers)430        self.protocol.onMessage(req, False)431        resp = json.loads(send_mock.call_args[0][0].decode())432        self.assertEqual(200, resp['headers']['status'])433    def test_delete_multiple_invalid_messages_204s(self):434        action = consts.MESSAGE_DELETE435        body = {"queue_name": "yetanothernotthere",436                "message_ids": ["a", "b", "c"]}437        send_mock = mock.Mock()438        self.protocol.sendMessage = send_mock439        req = test_utils.create_request(action, body, self.headers)440        self.protocol.onMessage(req, False)441        resp = json.loads(send_mock.call_args[0][0].decode())442        self.assertEqual(400, resp['headers']['status'])443    def _post_messages(self, queue_name, repeat=1):444        messages = [{'body': 239, 'ttl': 300}] * repeat445        action = consts.MESSAGE_POST446        body = {"queue_name": queue_name,447                "messages": messages}448        send_mock = mock.Mock()449        self.protocol.sendMessage = send_mock450        req = test_utils.create_request(action, body, self.headers)451        self.protocol.onMessage(req, False)452        return json.loads(send_mock.call_args[0][0].decode())453    def test_invalid_request(self):454        send_mock = mock.Mock()455        self.protocol.sendMessage = send_mock456        self.protocol.onMessage('foo', False)457        self.assertEqual(1, send_mock.call_count)458        response = json.loads(send_mock.call_args[0][0].decode())459        self.assertIn('error', response['body'])460        self.assertEqual({'status': 400}, response['headers'])461        self.assertEqual(462            {'action': None, 'api': 'v2', 'body': {}, 'headers': {}},...bybit.py
Source:bybit.py  
1from __future__ import annotations2import asyncio3import logging4from typing import Awaitable, Optional, Union5import aiohttp6from ..store import DataStore, DataStoreManager7from ..typedefs import Item8from ..ws import ClientWebSocketResponse9logger = logging.getLogger(__name__)10class BybitInverseDataStore(DataStoreManager):11    """12    Bybit Inverseå¥ç´ã®ãã¼ã¿ã¹ãã¢ããã¼ã¸ã£ã¼13    """14    def _init(self) -> None:15        self.create("orderbook", datastore_class=OrderBookInverse)16        self.create("trade", datastore_class=TradeInverse)17        self.create("insurance", datastore_class=Insurance)18        self.create("instrument", datastore_class=InstrumentInverse)19        self.create("kline", datastore_class=KlineInverse)20        self.create("liquidation", datastore_class=LiquidationInverse)21        self.create("position", datastore_class=PositionInverse)22        self.create("execution", datastore_class=ExecutionInverse)23        self.create("order", datastore_class=OrderInverse)24        self.create("stoporder", datastore_class=StopOrderInverse)25        self.create("wallet", datastore_class=WalletInverse)26        self.timestamp_e6: Optional[int] = None27    async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:28        """29        対å¿ã¨ã³ããã¤ã³ã30        - GET /v2/private/order (DataStore: order)31        - GET /futures/private/order (DataStore: order)32        - GET /v2/private/stop-order (DataStore: stoporder)33        - GET /futures/private/stop-order (DataStore: stoporder)34        - GET /v2/private/position/list (DataStore: position)35        - GET /futures/private/position/list (DataStore: position)36        - GET /v2/private/wallet/balance (DataStore: wallet)37        - GET /v2/public/kline/list (DataStore: kline)38        """39        for f in asyncio.as_completed(aws):40            resp = await f41            data = await resp.json()42            if data["ret_code"] != 0:43                raise ValueError(44                    "Response error at DataStore initialization\n"45                    f"URL: {resp.url}\n"46                    f"Data: {data}"47                )48            if resp.url.path in (49                "/v2/private/order",50                "/futures/private/order",51            ):52                self.order._onresponse(data["result"])53            elif resp.url.path in (54                "/v2/private/stop-order",55                "/futures/private/stop-order",56            ):57                self.stoporder._onresponse(data["result"])58            elif resp.url.path in (59                "/v2/private/position/list",60                "/futures/private/position/list",61            ):62                self.position._onresponse(data["result"])63            elif resp.url.path == "/v2/public/kline/list":64                self.kline._onresponse(data["result"])65            elif resp.url.path == "/v2/private/wallet/balance":66                self.wallet._onresponse(data["result"])67    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:68        if "success" in msg:69            if not msg["success"]:70                logger.warning(msg)71        if "topic" in msg:72            topic: str = msg["topic"]73            data = msg["data"]74            if any(75                [76                    topic.startswith("orderBookL2_25"),77                    topic.startswith("orderBook_200"),78                ]79            ):80                self.orderbook._onmessage(topic, msg["type"], data)81            elif topic.startswith("trade"):82                self.trade._onmessage(data)83            elif topic.startswith("insurance"):84                self.insurance._onmessage(data)85            elif topic.startswith("instrument_info"):86                self.instrument._onmessage(topic, msg["type"], data)87            if topic.startswith("klineV2"):88                self.kline._onmessage(topic, data)89            elif topic.startswith("liquidation"):90                self.liquidation._onmessage(data)91            elif topic == "position":92                self.position._onmessage(data)93            elif topic == "execution":94                self.execution._onmessage(data)95            elif topic == "order":96                self.order._onmessage(data)97            elif topic == "stop_order":98                self.stoporder._onmessage(data)99            elif topic == "wallet":100                self.wallet._onmessage(data)101        if "timestamp_e6" in msg:102            self.timestamp_e6 = int(msg["timestamp_e6"])103    @property104    def orderbook(self) -> "OrderBookInverse":105        return self.get("orderbook", OrderBookInverse)106    @property107    def trade(self) -> "TradeInverse":108        return self.get("trade", TradeInverse)109    @property110    def insurance(self) -> "Insurance":111        return self.get("insurance", Insurance)112    @property113    def instrument(self) -> "InstrumentInverse":114        return self.get("instrument", InstrumentInverse)115    @property116    def kline(self) -> "KlineInverse":117        return self.get("kline", KlineInverse)118    @property119    def liquidation(self) -> "LiquidationInverse":120        return self.get("liquidation", LiquidationInverse)121    @property122    def position(self) -> "PositionInverse":123        """124        ã¤ã³ãã¼ã¹å¥ç´(ç¡æé/å
ç©)ç¨ã®ãã¸ã·ã§ã³125        """126        return self.get("position", PositionInverse)127    @property128    def execution(self) -> "ExecutionInverse":129        return self.get("execution", ExecutionInverse)130    @property131    def order(self) -> "OrderInverse":132        """133        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)134        """135        return self.get("order", OrderInverse)136    @property137    def stoporder(self) -> "StopOrderInverse":138        """139        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)140        """141        return self.get("stoporder", StopOrderInverse)142    @property143    def wallet(self) -> "WalletInverse":144        return self.get("wallet", WalletInverse)145class BybitUSDTDataStore(DataStoreManager):146    """147    Bybit USDTå¥ç´ã®ãã¼ã¿ã¹ãã¢ããã¼ã¸ã£ã¼148    """149    def _init(self) -> None:150        self.create("orderbook", datastore_class=OrderBookUSDT)151        self.create("trade", datastore_class=TradeUSDT)152        self.create("insurance", datastore_class=Insurance)153        self.create("instrument", datastore_class=InstrumentUSDT)154        self.create("kline", datastore_class=KlineUSDT)155        self.create("liquidation", datastore_class=LiquidationUSDT)156        self.create("position", datastore_class=PositionUSDT)157        self.create("execution", datastore_class=ExecutionUSDT)158        self.create("order", datastore_class=OrderUSDT)159        self.create("stoporder", datastore_class=StopOrderUSDT)160        self.create("wallet", datastore_class=WalletUSDT)161        self.timestamp_e6: Optional[int] = None162    async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:163        """164        対å¿ã¨ã³ããã¤ã³ã165        - GET /private/linear/order/search (DataStore: order)166        - GET /private/linear/stop-order/search (DataStore: stoporder)167        - GET /private/linear/position/list (DataStore: position)168        - GET /private/linear/position/list (DataStore: position)169        - GET /public/linear/kline (DataStore: kline)170        - GET /v2/private/wallet/balance (DataStore: wallet)171        """172        for f in asyncio.as_completed(aws):173            resp = await f174            data = await resp.json()175            if data["ret_code"] != 0:176                raise ValueError(177                    "Response error at DataStore initialization\n"178                    f"URL: {resp.url}\n"179                    f"Data: {data}"180                )181            if resp.url.path == "/private/linear/order/search":182                self.order._onresponse(data["result"])183            elif resp.url.path == "/private/linear/stop-order/search":184                self.stoporder._onresponse(data["result"])185            elif resp.url.path == "/private/linear/position/list":186                self.position._onresponse(data["result"])187            elif resp.url.path == "/public/linear/kline":188                self.kline._onresponse(data["result"])189            elif resp.url.path == "/v2/private/wallet/balance":190                self.wallet._onresponse(data["result"])191    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:192        if "success" in msg:193            if not msg["success"]:194                logger.warning(msg)195        if "topic" in msg:196            topic: str = msg["topic"]197            data = msg["data"]198            if any(199                [200                    topic.startswith("orderBookL2_25"),201                    topic.startswith("orderBook_200"),202                ]203            ):204                self.orderbook._onmessage(topic, msg["type"], data)205            elif topic.startswith("trade"):206                self.trade._onmessage(data)207            elif topic.startswith("instrument_info"):208                self.instrument._onmessage(topic, msg["type"], data)209            if topic.startswith("candle"):210                self.kline._onmessage(topic, data)211            elif topic.startswith("liquidation"):212                self.liquidation._onmessage(data)213            elif topic == "position":214                self.position._onmessage(data)215            elif topic == "execution":216                self.execution._onmessage(data)217            elif topic == "order":218                self.order._onmessage(data)219            elif topic == "stop_order":220                self.stoporder._onmessage(data)221            elif topic == "wallet":222                self.wallet._onmessage(data)223        if "timestamp_e6" in msg:224            self.timestamp_e6 = int(msg["timestamp_e6"])225    @property226    def orderbook(self) -> "OrderBookUSDT":227        return self.get("orderbook", OrderBookUSDT)228    @property229    def trade(self) -> "TradeUSDT":230        return self.get("trade", TradeUSDT)231    @property232    def instrument(self) -> "InstrumentUSDT":233        return self.get("instrument", InstrumentUSDT)234    @property235    def kline(self) -> "KlineUSDT":236        return self.get("kline", KlineUSDT)237    @property238    def liquidation(self) -> "LiquidationUSDT":239        return self.get("liquidation", LiquidationUSDT)240    @property241    def position(self) -> "PositionUSDT":242        """243        USDTå¥ç´ç¨ã®ãã¸ã·ã§ã³244        """245        return self.get("position", PositionUSDT)246    @property247    def execution(self) -> "ExecutionUSDT":248        return self.get("execution", ExecutionUSDT)249    @property250    def order(self) -> "OrderUSDT":251        """252        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)253        """254        return self.get("order", OrderUSDT)255    @property256    def stoporder(self) -> "StopOrderUSDT":257        """258        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)259        """260        return self.get("stoporder", StopOrderUSDT)261    @property262    def wallet(self) -> "WalletUSDT":263        return self.get("wallet", WalletUSDT)264class OrderBookInverse(DataStore):265    _KEYS = ["symbol", "id", "side"]266    def sorted(self, query: Optional[Item] = None) -> dict[str, list[Item]]:267        if query is None:268            query = {}269        result = {"Sell": [], "Buy": []}270        for item in self:271            if all(k in item and query[k] == item[k] for k in query):272                result[item["side"]].append(item)273        result["Sell"].sort(key=lambda x: x["id"])274        result["Buy"].sort(key=lambda x: x["id"], reverse=True)275        return result276    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:277        if type_ == "snapshot":278            symbol = topic.split(".")[-1]279            # ex: "orderBookL2_25.BTCUSD", "orderBook_200.100ms.BTCUSD"280            result = self.find({"symbol": symbol})281            self._delete(result)282            self._insert(data)283        elif type_ == "delta":284            self._delete(data["delete"])285            self._update(data["update"])286            self._insert(data["insert"])287class OrderBookUSDT(OrderBookInverse):288    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:289        if type_ == "snapshot":290            symbol = topic.split(".")[-1]291            # ex: "orderBookL2_25.BTCUSDT", "orderBook_200.100ms.BTCUSDT"292            result = self.find({"symbol": symbol})293            self._delete(result)294            self._insert(data["order_book"])295        elif type_ == "delta":296            self._delete(data["delete"])297            self._update(data["update"])298            self._insert(data["insert"])299class TradeInverse(DataStore):300    _KEYS = ["trade_id"]301    _MAXLEN = 99999302    def _onmessage(self, data: list[Item]) -> None:303        self._insert(data)304class TradeUSDT(TradeInverse):305    ...306class Insurance(DataStore):307    _KEYS = ["currency"]308    def _onmessage(self, data: list[Item]) -> None:309        self._update(data)310class InstrumentInverse(DataStore):311    _KEYS = ["symbol"]312    def _onmessage(self, topic: str, type_: str, data: Item) -> None:313        if type_ == "snapshot":314            symbol = topic.split(".")[-1]  # ex: "instrument_info.100ms.BTCUSD"315            result = self.find({"symbol": symbol})316            self._delete(result)317            self._insert([data])318        elif type_ == "delta":319            self._update(data["update"])320class InstrumentUSDT(InstrumentInverse):321    ...322class KlineInverse(DataStore):323    _KEYS = ["start", "symbol", "interval"]324    def _onmessage(self, topic: str, data: list[Item]) -> None:325        topic_split = topic.split(".")  # ex:"klineV2.1.BTCUSD"326        for item in data:327            item["symbol"] = topic_split[-1]328            item["interval"] = topic_split[-2]329        self._update(data)330    def _onresponse(self, data: list[Item]) -> None:331        for item in data:332            item["start"] = item.pop("open_time")333        self._update(data)334class KlineUSDT(KlineInverse):335    ...336class LiquidationInverse(DataStore):337    _MAXLEN = 99999338    def _onmessage(self, item: Item) -> None:339        self._insert([item])340class LiquidationUSDT(LiquidationInverse):341    ...342class PositionInverse(DataStore):343    _KEYS = ["symbol", "position_idx"]344    def one(self, symbol: str) -> Optional[Item]:345        return self.get({"symbol": symbol, "position_idx": 0})346    def both(self, symbol: str) -> dict[str, Optional[Item]]:347        return {348            "Sell": self.get({"symbol": symbol, "position_idx": 2}),349            "Buy": self.get({"symbol": symbol, "position_idx": 1}),350        }351    def _onresponse(self, data: Union[Item, list[Item]]) -> None:352        if isinstance(data, dict):353            self._update([data])  # ex: {"symbol": "BTCUSD", ...}354        elif isinstance(data, list):355            for item in data:356                if "is_valid" in item:357                    if item["is_valid"]:358                        self._update([item["data"]])359                        # ex:360                        # [361                        #     {362                        #         "is_valid": True,363                        #         "data": {"symbol": "BTCUSDM21", ...}364                        #     },365                        #     ...366                        # ]367                else:368                    self._update([item])369                    # ex: [{"symbol": "BTCUSDT", ...}, ...]370    def _onmessage(self, data: list[Item]) -> None:371        self._update(data)372class PositionUSDT(PositionInverse):373    def _onmessage(self, data: list[Item]) -> None:374        for item in data:375            item["position_idx"] = int(item["position_idx"])376            self._update([item])377class ExecutionInverse(DataStore):378    _KEYS = ["exec_id"]379    def _onmessage(self, data: list[Item]) -> None:380        self._update(data)381class ExecutionUSDT(ExecutionInverse):382    ...383class OrderInverse(DataStore):384    _KEYS = ["order_id"]385    def _onresponse(self, data: list[Item]) -> None:386        if isinstance(data, list):387            self._update(data)388        elif isinstance(data, dict):389            self._update([data])390    def _onmessage(self, data: list[Item]) -> None:391        for item in data:392            if item["order_status"] in ("Created", "New", "PartiallyFilled"):393                self._update([item])394            else:395                self._delete([item])396class OrderUSDT(OrderInverse):397    ...398class StopOrderInverse(DataStore):399    _KEYS = ["order_id"]400    def _onresponse(self, data: list[Item]) -> None:401        if isinstance(data, list):402            self._update(data)403        elif isinstance(data, dict):404            self._update([data])405    def _onmessage(self, data: list[Item]) -> None:406        for item in data:407            if item["order_status"] in ("Active", "Untriggered"):408                self._update([item])409            else:410                self._delete([item])411class StopOrderUSDT(StopOrderInverse):412    _KEYS = ["stop_order_id"]413class WalletInverse(DataStore):414    _KEYS = ["coin"]415    def _onresponse(self, data: dict[str, Item]) -> None:416        data.pop("USDT", None)417        for coin in data:418            self._update(419                [420                    {421                        "coin": coin,422                        "available_balance": data[coin]["available_balance"],423                        "wallet_balance": data[coin]["wallet_balance"],424                    }425                ]426            )427    def _onmessage(self, data: list[Item]) -> None:428        self._update(data)429class WalletUSDT(WalletInverse):430    def _onresponse(self, data: dict[str, Item]) -> None:431        if "USDT" in data:432            self._update(433                [434                    {435                        "coin": "USDT",436                        "wallet_balance": data["USDT"]["wallet_balance"],437                        "available_balance": data["USDT"]["available_balance"],438                    }439                ]440            )441    def _onmessage(self, data: list[Item]) -> None:442        for item in data:...test_claims.py
Source:test_claims.py  
...34        action = consts.QUEUE_CREATE35        body = {"queue_name": "skittle"}36        req = test_utils.create_request(action, body, self.headers)37        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:38            self.protocol.onMessage(req, False)39            resp = json.loads(msg_mock.call_args[0][0].decode())40            self.assertIn(resp['headers']['status'], [201, 204])41        action = consts.MESSAGE_POST42        body = {"queue_name": "skittle",43                "messages": [44                    {'body': 239, 'ttl': 300},45                    {'body': {'key_1': 'value_1'}, 'ttl': 300},46                    {'body': [1, 3], 'ttl': 300},47                    {'body': 439, 'ttl': 300},48                    {'body': {'key_2': 'value_2'}, 'ttl': 300},49                    {'body': ['a', 'b'], 'ttl': 300},50                    {'body': 639, 'ttl': 300},51                    {'body': {'key_3': 'value_3'}, 'ttl': 300},52                    {'body': ["aa", "bb"], 'ttl': 300}]53                }54        send_mock = mock.Mock()55        self.protocol.sendMessage = send_mock56        req = test_utils.create_request(action, body, self.headers)57        self.protocol.onMessage(req, False)58        resp = json.loads(send_mock.call_args[0][0].decode())59        self.assertEqual(201, resp['headers']['status'])60    def tearDown(self):61        super(ClaimsBaseTest, self).tearDown()62        action = consts.QUEUE_DELETE63        body = {'queue_name': 'skittle'}64        send_mock = mock.Mock()65        self.protocol.sendMessage = send_mock66        req = test_utils.create_request(action, body, self.headers)67        self.protocol.onMessage(req, False)68        resp = json.loads(send_mock.call_args[0][0].decode())69        self.assertEqual(204, resp['headers']['status'])70    @ddt.data('[', '[]', '.', '"fail"')71    def test_bad_claim(self, doc):72        action = consts.CLAIM_CREATE73        body = doc74        send_mock = mock.Mock()75        self.protocol.sendMessage = send_mock76        req = test_utils.create_request(action, body, self.headers)77        self.protocol.onMessage(req, False)78        resp = json.loads(send_mock.call_args[0][0].decode())79        self.assertEqual(400, resp['headers']['status'])80        action = consts.CLAIM_UPDATE81        body = doc82        req = test_utils.create_request(action, body, self.headers)83        self.protocol.onMessage(req, False)84        resp = json.loads(send_mock.call_args[0][0].decode())85        self.assertEqual(400, resp['headers']['status'])86    def test_exceeded_claim(self):87        action = consts.CLAIM_CREATE88        body = {"queue_name": "skittle",89                "ttl": 100,90                "grace": 60,91                "limit": 21}92        send_mock = mock.Mock()93        self.protocol.sendMessage = send_mock94        req = test_utils.create_request(action, body, self.headers)95        self.protocol.onMessage(req, False)96        resp = json.loads(send_mock.call_args[0][0].decode())97        self.assertEqual(400, resp['headers']['status'])98    @ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))99    def test_unacceptable_ttl_or_grace(self, ttl_grace):100        ttl, grace = ttl_grace101        action = consts.CLAIM_CREATE102        body = {"queue_name": "skittle",103                "ttl": ttl,104                "grace": grace}105        send_mock = mock.Mock()106        self.protocol.sendMessage = send_mock107        req = test_utils.create_request(action, body, self.headers)108        self.protocol.onMessage(req, False)109        resp = json.loads(send_mock.call_args[0][0].decode())110        self.assertEqual(400, resp['headers']['status'])111    @ddt.data(-1, 59, 43201)112    def test_unacceptable_new_ttl(self, ttl):113        claim = self._get_a_claim()114        action = consts.CLAIM_UPDATE115        body = {"queue_name": "skittle",116                "claim_id": claim['body']['claim_id'],117                "ttl": ttl}118        send_mock = mock.Mock()119        self.protocol.sendMessage = send_mock120        req = test_utils.create_request(action, body, self.headers)121        self.protocol.onMessage(req, False)122        resp = json.loads(send_mock.call_args[0][0].decode())123        self.assertEqual(400, resp['headers']['status'])124    def test_default_ttl_and_grace(self):125        action = consts.CLAIM_CREATE126        body = {"queue_name": "skittle"}127        send_mock = mock.Mock()128        self.protocol.sendMessage = send_mock129        req = test_utils.create_request(action, body, self.headers)130        self.protocol.onMessage(req, False)131        resp = json.loads(send_mock.call_args[0][0].decode())132        self.assertEqual(201, resp['headers']['status'])133        action = consts.CLAIM_GET134        body = {"queue_name": "skittle",135                "claim_id": resp['body']['claim_id']}136        req = test_utils.create_request(action, body, self.headers)137        self.protocol.onMessage(req, False)138        resp = json.loads(send_mock.call_args[0][0].decode())139        self.assertEqual(200, resp['headers']['status'])140        self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])141    def test_lifecycle(self):142        # First, claim some messages143        action = consts.CLAIM_CREATE144        body = {"queue_name": "skittle",145                "ttl": 100,146                "grace": 60}147        send_mock = mock.Mock()148        self.protocol.sendMessage = send_mock149        req = test_utils.create_request(action, body, self.headers)150        self.protocol.onMessage(req, False)151        resp = json.loads(send_mock.call_args[0][0].decode())152        self.assertEqual(201, resp['headers']['status'])153        claimed_messages = resp['body']['messages']154        claim_id = resp['body']['claim_id']155        # No more messages to claim156        body = {"queue_name": "skittle",157                "ttl": 100,158                "grace": 60}159        req = test_utils.create_request(action, body, self.headers)160        self.protocol.onMessage(req, False)161        resp = json.loads(send_mock.call_args[0][0].decode())162        self.assertEqual(204, resp['headers']['status'])163        # Listing messages, by default, won't include claimed, will echo164        action = consts.MESSAGE_LIST165        body = {"queue_name": "skittle",166                "echo": True}167        req = test_utils.create_request(action, body, self.headers)168        self.protocol.onMessage(req, False)169        resp = json.loads(send_mock.call_args[0][0].decode())170        self.assertEqual(200, resp['headers']['status'])171        self.assertEqual([], resp['body']['messages'])172        # Listing messages, by default, won't include claimed, won't echo173        body = {"queue_name": "skittle",174                "echo": False}175        req = test_utils.create_request(action, body, self.headers)176        self.protocol.onMessage(req, False)177        resp = json.loads(send_mock.call_args[0][0].decode())178        self.assertEqual(200, resp['headers']['status'])179        self.assertEqual([], resp['body']['messages'])180        # List messages, include_claimed, but don't echo181        body = {"queue_name": "skittle",182                "include_claimed": True,183                "echo": False}184        req = test_utils.create_request(action, body, self.headers)185        self.protocol.onMessage(req, False)186        resp = json.loads(send_mock.call_args[0][0].decode())187        self.assertEqual(200, resp['headers']['status'])188        self.assertEqual(resp['body']['messages'], [])189        # List messages with a different client-id and echo=false.190        # Should return some messages191        body = {"queue_name": "skittle",192                "echo": False}193        headers = {194            'Client-ID': uuidutils.generate_uuid(),195            'X-Project-ID': self.project_id196        }197        req = test_utils.create_request(action, body, headers)198        self.protocol.onMessage(req, False)199        resp = json.loads(send_mock.call_args[0][0].decode())200        self.assertEqual(200, resp['headers']['status'])201        # Include claimed messages this time, and echo202        body = {"queue_name": "skittle",203                "include_claimed": True,204                "echo": True}205        req = test_utils.create_request(action, body, self.headers)206        self.protocol.onMessage(req, False)207        resp = json.loads(send_mock.call_args[0][0].decode())208        self.assertEqual(200, resp['headers']['status'])209        self.assertEqual(len(claimed_messages), len(resp['body']['messages']))210        message_id_1 = resp['body']['messages'][0]['id']211        message_id_2 = resp['body']['messages'][1]['id']212        # Try to delete the message without submitting a claim_id213        action = consts.MESSAGE_DELETE214        body = {"queue_name": "skittle",215                "message_id": message_id_1}216        req = test_utils.create_request(action, body, self.headers)217        self.protocol.onMessage(req, False)218        resp = json.loads(send_mock.call_args[0][0].decode())219        self.assertEqual(403,  resp['headers']['status'])220        # Delete the message and its associated claim221        body = {"queue_name": "skittle",222                "message_id": message_id_1,223                "claim_id": claim_id}224        req = test_utils.create_request(action, body, self.headers)225        self.protocol.onMessage(req, False)226        resp = json.loads(send_mock.call_args[0][0].decode())227        self.assertEqual(204, resp['headers']['status'])228        # Try to get it from the wrong project229        headers = {230            'Client-ID': uuidutils.generate_uuid(),231            'X-Project-ID': 'someproject'232        }233        action = consts.MESSAGE_GET234        body = {"queue_name": "skittle",235                "message_id": message_id_2}236        req = test_utils.create_request(action, body, headers)237        self.protocol.onMessage(req, False)238        resp = json.loads(send_mock.call_args[0][0].decode())239        self.assertEqual(404,  resp['headers']['status'])240        # Get the message241        action = consts.MESSAGE_GET242        body = {"queue_name": "skittle",243                "message_id": message_id_2}244        req = test_utils.create_request(action, body, self.headers)245        self.protocol.onMessage(req, False)246        resp = json.loads(send_mock.call_args[0][0].decode())247        self.assertEqual(200, resp['headers']['status'])248        # Update the claim249        creation = timeutils.utcnow()250        action = consts.CLAIM_UPDATE251        body = {"queue_name": "skittle",252                "ttl": 60,253                "grace": 60,254                "claim_id": claim_id}255        req = test_utils.create_request(action, body, self.headers)256        self.protocol.onMessage(req, False)257        resp = json.loads(send_mock.call_args[0][0].decode())258        self.assertEqual(204, resp['headers']['status'])259        # Get the claimed messages (again)260        action = consts.CLAIM_GET261        body = {"queue_name": "skittle",262                "claim_id": claim_id}263        req = test_utils.create_request(action, body, self.headers)264        self.protocol.onMessage(req, False)265        query = timeutils.utcnow()266        resp = json.loads(send_mock.call_args[0][0].decode())267        self.assertEqual(200, resp['headers']['status'])268        self.assertEqual(60, resp['body']['ttl'])269        message_id_3 = resp['body']['messages'][0]['id']270        estimated_age = timeutils.delta_seconds(creation, query)271        # The claim's age should be 0 at this moment. But in some unexpected272        # case, such as slow test, the age maybe larger than 0. Just skip273        # asserting if so.274        if resp['body']['age'] == 0:275            self.assertGreater(estimated_age, resp['body']['age'])276        # Delete the claim277        action = consts.CLAIM_DELETE278        body = {"queue_name": "skittle",279                "claim_id": claim_id}280        req = test_utils.create_request(action, body, self.headers)281        self.protocol.onMessage(req, False)282        resp = json.loads(send_mock.call_args[0][0].decode())283        self.assertEqual(204, resp['headers']['status'])284        # Try to delete a message with an invalid claim ID285        action = consts.MESSAGE_DELETE286        body = {"queue_name": "skittle",287                "message_id": message_id_3,288                "claim_id": claim_id}289        req = test_utils.create_request(action, body, self.headers)290        self.protocol.onMessage(req, False)291        resp = json.loads(send_mock.call_args[0][0].decode())292        self.assertEqual(400, resp['headers']['status'])293        # Make sure it wasn't deleted!294        action = consts.MESSAGE_GET295        body = {"queue_name": "skittle",296                "message_id": message_id_2}297        req = test_utils.create_request(action, body, self.headers)298        self.protocol.onMessage(req, False)299        resp = json.loads(send_mock.call_args[0][0].decode())300        self.assertEqual(200, resp['headers']['status'])301        # Try to get a claim that doesn't exist302        action = consts.CLAIM_GET303        body = {"queue_name": "skittle",304                "claim_id": claim_id}305        req = test_utils.create_request(action, body, self.headers)306        self.protocol.onMessage(req, False)307        resp = json.loads(send_mock.call_args[0][0].decode())308        self.assertEqual(404,  resp['headers']['status'])309        # Try to update a claim that doesn't exist310        action = consts.CLAIM_UPDATE311        body = {"queue_name": "skittle",312                "ttl": 60,313                "grace": 60,314                "claim_id": claim_id}315        req = test_utils.create_request(action, body, self.headers)316        self.protocol.onMessage(req, False)317        resp = json.loads(send_mock.call_args[0][0].decode())318        self.assertEqual(404,  resp['headers']['status'])319    def test_post_claim_nonexistent_queue(self):320        action = consts.CLAIM_CREATE321        body = {"queue_name": "nonexistent",322                "ttl": 100,323                "grace": 60}324        send_mock = mock.Mock()325        self.protocol.sendMessage = send_mock326        req = test_utils.create_request(action, body, self.headers)327        self.protocol.onMessage(req, False)328        resp = json.loads(send_mock.call_args[0][0].decode())329        self.assertEqual(204, resp['headers']['status'])330    def test_get_claim_nonexistent_queue(self):331        action = consts.CLAIM_GET332        body = {"queue_name": "nonexistent",333                "claim_id": "aaabbbba"}334        send_mock = mock.Mock()335        self.protocol.sendMessage = send_mock336        req = test_utils.create_request(action, body, self.headers)337        self.protocol.onMessage(req, False)338        resp = json.loads(send_mock.call_args[0][0].decode())339        self.assertEqual(404,  resp['headers']['status'])340    def _get_a_claim(self):341        action = consts.CLAIM_CREATE342        body = {"queue_name": "skittle",343                "ttl": 100,344                "grace": 60}345        send_mock = mock.Mock()346        self.protocol.sendMessage = send_mock347        req = test_utils.create_request(action, body, self.headers)348        self.protocol.onMessage(req, False)349        resp = json.loads(send_mock.call_args[0][0].decode())350        self.assertEqual(201, resp['headers']['status'])...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
