Best JavaScript code snippet using wpt
test_queue_lifecycle.py
Source:test_queue_lifecycle.py  
1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may not4# use this file except in compliance with the License.  You may obtain a copy5# of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations under13# the License.14import json15import ddt16import mock17from oslo_utils import uuidutils18from zaqar.common import consts19from zaqar.storage import errors as storage_errors20from zaqar import tests as testing21from zaqar.tests.unit.transport.websocket import base22from zaqar.tests.unit.transport.websocket import utils as test_utils23@ddt.ddt24class QueueLifecycleBaseTest(base.V2Base):25    config_file = "websocket_mongodb.conf"26    def setUp(self):27        super(QueueLifecycleBaseTest, self).setUp()28        self.protocol = self.transport.factory()29    def test_empty_project_id(self):30        action = consts.QUEUE_CREATE31        body = {"queue_name": "kitkat",32                "metadata": {33                    "key": {34                        "key2": "value",35                        "key3": [1, 2, 3, 4, 5]}36                    }37                }38        headers = {'Client-ID': uuidutils.generate_uuid()}39        req = test_utils.create_request(action, body, headers)40        def validator(resp, isBinary):41            resp = json.loads(resp.decode())42            self.assertEqual(400, resp['headers']['status'])43        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:44            msg_mock.side_effect = validator45            self.protocol.onMessage(req, False)46    @ddt.data('480924', 'foo')47    def test_basics_thoroughly(self, project_id):48        # Stats are empty - queue not created yet49        action = consts.QUEUE_GET_STATS50        body = {"queue_name": "gummybears"}51        headers = {52            'Client-ID': uuidutils.generate_uuid(),53            'X-Project-ID': project_id54        }55        send_mock = mock.patch.object(self.protocol, 'sendMessage')56        self.addCleanup(send_mock.stop)57        sender = send_mock.start()58        req = test_utils.create_request(action, body, headers)59        def validator(resp, isBinary):60            resp = json.loads(resp.decode())61            self.assertEqual(404, resp['headers']['status'])62        sender.side_effect = validator63        self.protocol.onMessage(req, False)64        # Create65        action = consts.QUEUE_CREATE66        body = {"queue_name": "gummybears",67                "metadata": {68                    "key": {69                        "key2": "value",70                        "key3": [1, 2, 3, 4, 5]},71                    "messages": {"ttl": 600},72                    }73                }74        req = test_utils.create_request(action, body, headers)75        def validator(resp, isBinary):76            resp = json.loads(resp.decode())77            self.assertEqual(201, resp['headers']['status'])78        sender.side_effect = validator79        self.protocol.onMessage(req, False)80        # Fetch metadata81        action = consts.QUEUE_GET82        body = {"queue_name": "gummybears"}83        meta = {"messages": {"ttl": 600},84                "key": {85                    "key2": "value",86                    "key3": [1, 2, 3, 4, 5]}87                }88        req = test_utils.create_request(action, body, headers)89        def validator(resp, isBinary):90            resp = json.loads(resp.decode())91            self.assertEqual(200, resp['headers']['status'])92            self.assertEqual(meta, resp['body'])93        sender.side_effect = validator94        self.protocol.onMessage(req, False)95        # Stats empty queue96        action = consts.QUEUE_GET_STATS97        body = {"queue_name": "gummybears"}98        req = test_utils.create_request(action, body, headers)99        def validator(resp, isBinary):100            resp = json.loads(resp.decode())101            self.assertEqual(200, resp['headers']['status'])102        sender.side_effect = validator103        self.protocol.onMessage(req, False)104        # Delete105        action = consts.QUEUE_DELETE106        body = {"queue_name": "gummybears"}107        req = test_utils.create_request(action, body, headers)108        def validator(resp, isBinary):109            resp = json.loads(resp.decode())110            self.assertEqual(204, resp['headers']['status'])111        sender.side_effect = validator112        self.protocol.onMessage(req, False)113        # Get non-existent stats114        action = consts.QUEUE_GET_STATS115        body = {"queue_name": "gummybears"}116        req = test_utils.create_request(action, body, headers)117        def validator(resp, isBinary):118            resp = json.loads(resp.decode())119            self.assertEqual(404, resp['headers']['status'])120        sender.side_effect = validator121        self.protocol.onMessage(req, False)122    def test_name_restrictions(self):123        headers = {124            'Client-ID': uuidutils.generate_uuid(),125            'X-Project-ID': 'test-project'126        }127        action = consts.QUEUE_CREATE128        body = {"queue_name": 'marsbar',129                "metadata": {130                    "key": {131                        "key2": "value",132                        "key3": [1, 2, 3, 4, 5]},133                    "messages": {"ttl": 600},134                    }135                }136        send_mock = mock.patch.object(self.protocol, 'sendMessage')137        self.addCleanup(send_mock.stop)138        sender = send_mock.start()139        req = test_utils.create_request(action, body, headers)140        def validator(resp, isBinary):141            resp = json.loads(resp.decode())142            self.assertIn(resp['headers']['status'], [201, 204])143        sender.side_effect = validator144        self.protocol.onMessage(req, False)145        body["queue_name"] = "m@rsb@r"146        req = test_utils.create_request(action, body, headers)147        def validator(resp, isBinary):148            resp = json.loads(resp.decode())149            self.assertEqual(400, resp['headers']['status'])150        sender.side_effect = validator151        self.protocol.onMessage(req, False)152        body["queue_name"] = "marsbar" * 10153        req = test_utils.create_request(action, body, headers)154        self.protocol.onMessage(req, False)155    def test_project_id_restriction(self):156        headers = {157            'Client-ID': uuidutils.generate_uuid(),158            'X-Project-ID': 'test-project' * 30159        }160        action = consts.QUEUE_CREATE161        body = {"queue_name": 'poptart'}162        send_mock = mock.patch.object(self.protocol, 'sendMessage')163        self.addCleanup(send_mock.stop)164        sender = send_mock.start()165        req = test_utils.create_request(action, body, headers)166        def validator(resp, isBinary):167            resp = json.loads(resp.decode())168            self.assertEqual(400, resp['headers']['status'])169        sender.side_effect = validator170        self.protocol.onMessage(req, False)171        headers['X-Project-ID'] = 'test-project'172        req = test_utils.create_request(action, body, headers)173        def validator(resp, isBinary):174            resp = json.loads(resp.decode())175            self.assertIn(resp['headers']['status'], [201, 204])176        sender.side_effect = validator177        self.protocol.onMessage(req, False)178    def test_non_ascii_name(self):179        test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'),180                       (u'/queues/non-ascii-n\xc4me', 'iso8859-1'))181        headers = {182            'Client-ID': uuidutils.generate_uuid(),183            'X-Project-ID': 'test-project' * 30184        }185        action = consts.QUEUE_CREATE186        body = {"queue_name": test_params[0]}187        send_mock = mock.patch.object(self.protocol, 'sendMessage')188        self.addCleanup(send_mock.stop)189        sender = send_mock.start()190        req = test_utils.create_request(action, body, headers)191        def validator(resp, isBinary):192            resp = json.loads(resp.decode())193            self.assertEqual(400, resp['headers']['status'])194        sender.side_effect = validator195        self.protocol.onMessage(req, False)196        body = {"queue_name": test_params[1]}197        req = test_utils.create_request(action, body, headers)198        self.protocol.onMessage(req, False)199    def test_no_metadata(self):200        headers = {201            'Client-ID': uuidutils.generate_uuid(),202            'X-Project-ID': 'test-project'203        }204        action = consts.QUEUE_CREATE205        body = {"queue_name": "fizbat"}206        send_mock = mock.patch.object(self.protocol, 'sendMessage')207        self.addCleanup(send_mock.stop)208        sender = send_mock.start()209        req = test_utils.create_request(action, body, headers)210        def validator(resp, isBinary):211            resp = json.loads(resp.decode())212            self.assertIn(resp['headers']['status'], [201, 204])213        sender.side_effect = validator214        self.protocol.onMessage(req, False)215        def validator(resp, isBinary):216            resp = json.loads(resp.decode())217            self.assertEqual(204, resp['headers']['status'])218        sender.side_effect = validator219        self.protocol.onMessage(req, False)220    @ddt.data('{', '[]', '.', '  ')221    def test_bad_metadata(self, meta):222        headers = {223            'Client-ID': uuidutils.generate_uuid(),224            'X-Project-ID': 'test-project' * 30225        }226        action = consts.QUEUE_CREATE227        body = {"queue_name": "fizbat",228                "metadata": meta}229        send_mock = mock.patch.object(self.protocol, 'sendMessage')230        self.addCleanup(send_mock.stop)231        sender = send_mock.start()232        req = test_utils.create_request(action, body, headers)233        def validator(resp, isBinary):234            resp = json.loads(resp.decode())235            self.assertEqual(400, resp['headers']['status'])236        sender.side_effect = validator237        self.protocol.onMessage(req, False)238    def test_too_much_metadata(self):239        headers = {240            'Client-ID': uuidutils.generate_uuid(),241            'X-Project-ID': 'test-project'242        }243        action = consts.QUEUE_CREATE244        body = {"queue_name": "buttertoffee",245                "metadata": {"messages": {"ttl": 600},246                             "padding": "x"}247                }248        max_size = self.transport_cfg.max_queue_metadata249        body["metadata"]["padding"] = "x" * max_size250        send_mock = mock.patch.object(self.protocol, 'sendMessage')251        self.addCleanup(send_mock.stop)252        sender = send_mock.start()253        req = test_utils.create_request(action, body, headers)254        def validator(resp, isBinary):255            resp = json.loads(resp.decode())256            self.assertEqual(400, resp['headers']['status'])257        sender.side_effect = validator258        self.protocol.onMessage(req, False)259    def test_way_too_much_metadata(self):260        headers = {261            'Client-ID': uuidutils.generate_uuid(),262            'X-Project-ID': 'test-project'263        }264        action = consts.QUEUE_CREATE265        body = {"queue_name": "peppermint",266                "metadata": {"messages": {"ttl": 600},267                             "padding": "x"}268                }269        max_size = self.transport_cfg.max_queue_metadata270        body["metadata"]["padding"] = "x" * max_size * 5271        send_mock = mock.patch.object(self.protocol, 'sendMessage')272        self.addCleanup(send_mock.stop)273        sender = send_mock.start()274        req = test_utils.create_request(action, body, headers)275        def validator(resp, isBinary):276            resp = json.loads(resp.decode())277            self.assertEqual(400, resp['headers']['status'])278        sender.side_effect = validator279        self.protocol.onMessage(req, False)280    def test_update_metadata(self):281        self.skip("Implement patch method")282        headers = {283            'Client-ID': uuidutils.generate_uuid(),284            'X-Project-ID': 'test-project'285        }286        action = consts.QUEUE_CREATE287        body = {"queue_name": "bonobon"}288        send_mock = mock.patch.object(self.protocol, 'sendMessage')289        self.addCleanup(send_mock.stop)290        sender = send_mock.start()291        # Create292        req = test_utils.create_request(action, body, headers)293        def validator(resp, isBinary):294            resp = json.loads(resp.decode())295            self.assertEqual(201, resp['headers']['status'])296        sender.side_effect = validator297        self.protocol.onMessage(req, False)298        # Set meta299        meta1 = {"messages": {"ttl": 600}, "padding": "x"}300        body["metadata"] = meta1301        req = test_utils.create_request(action, body, headers)302        def validator(resp, isBinary):303            resp = json.loads(resp.decode())304            self.assertEqual(204, resp['headers']['status'])305        sender.side_effect = validator306        self.protocol.onMessage(req, False)307        # Get308        action = consts.QUEUE_GET309        body = {"queue_name": "bonobon"}310        req = test_utils.create_request(action, body, headers)311        def validator(resp, isBinary):312            resp = json.loads(resp.decode())313            self.assertEqual(204, resp['headers']['status'])314            self.assertEqual(meta1, resp['body'])315        sender.side_effect = validator316        self.protocol.onMessage(req, False)317        # Update318        action = consts.QUEUE_CREATE319        meta2 = {"messages": {"ttl": 100}, "padding": "y"}320        body["metadata"] = meta2321        req = test_utils.create_request(action, body, headers)322        def validator(resp, isBinary):323            resp = json.loads(resp.decode())324            self.assertEqual(204, resp['headers']['status'])325        sender.side_effect = validator326        self.protocol.onMessage(req, False)327        # Get again328        action = consts.QUEUE_GET329        body = {"queue_name": "bonobon"}330        req = test_utils.create_request(action, body, headers)331        def validator(resp, isBinary):332            resp = json.loads(resp.decode())333            self.assertEqual(200, resp['headers']['status'])334            self.assertEqual(meta2, resp['body'])335        sender.side_effect = validator336        self.protocol.onMessage(req, False)337    def test_list(self):338        arbitrary_number = 644079696574693339        project_id = str(arbitrary_number)340        client_id = uuidutils.generate_uuid()341        headers = {342            'X-Project-ID': project_id,343            'Client-ID': client_id344        }345        send_mock = mock.patch.object(self.protocol, 'sendMessage')346        self.addCleanup(send_mock.stop)347        sender = send_mock.start()348        # NOTE(kgriffs): It's important that this one sort after the one349        # above. This is in order to prove that bug/1236605 is fixed, and350        # stays fixed!351        # NOTE(vkmc): In websockets as well!352        alt_project_id = str(arbitrary_number + 1)353        # List empty354        action = consts.QUEUE_LIST355        body = {}356        req = test_utils.create_request(action, body, headers)357        def validator(resp, isBinary):358            resp = json.loads(resp.decode())359            self.assertEqual(200, resp['headers']['status'])360            self.assertEqual([], resp['body']['queues'])361        sender.side_effect = validator362        self.protocol.onMessage(req, False)363        # Payload exceeded364        body = {'limit': 21}365        req = test_utils.create_request(action, body, headers)366        def validator(resp, isBinary):367            resp = json.loads(resp.decode())368            self.assertEqual(400, resp['headers']['status'])369        sender.side_effect = validator370        self.protocol.onMessage(req, False)371        # Create some372        def create_queue(project_id, queue_name, metadata):373            altheaders = {'Client-ID': client_id}374            if project_id is not None:375                altheaders['X-Project-ID'] = project_id376            action = consts.QUEUE_CREATE377            body['queue_name'] = queue_name378            body['metadata'] = metadata379            req = test_utils.create_request(action, body, altheaders)380            def validator(resp, isBinary):381                resp = json.loads(resp.decode())382                self.assertEqual(201, resp['headers']['status'])383            sender.side_effect = validator384            self.protocol.onMessage(req, False)385        create_queue(project_id, 'q1', {"node": 31})386        create_queue(project_id, 'q2', {"node": 32})387        create_queue(project_id, 'q3', {"node": 33})388        create_queue(alt_project_id, 'q3', {"alt": 1})389        # List (limit)390        body = {'limit': 2}391        req = test_utils.create_request(action, body, headers)392        def validator(resp, isBinary):393            resp = json.loads(resp.decode())394            self.assertEqual(2, len(resp['body']['queues']))395        sender.side_effect = validator396        self.protocol.onMessage(req, False)397        # List (no metadata, get all)398        body = {'limit': 5}399        req = test_utils.create_request(action, body, headers)400        def validator(resp, isBinary):401            resp = json.loads(resp.decode())402            self.assertEqual(200, resp['headers']['status'])403            # Ensure we didn't pick up the queue from the alt project.404            self.assertEqual(3, len(resp['body']['queues']))405        sender.side_effect = validator406        self.protocol.onMessage(req, False)407        # List with metadata408        body = {'detailed': True}409        req = test_utils.create_request(action, body, headers)410        def validator(resp, isBinary):411            resp = json.loads(resp.decode())412            self.assertEqual(200, resp['headers']['status'])413        sender.side_effect = validator414        self.protocol.onMessage(req, False)415        action = consts.QUEUE_GET416        body = {"queue_name": "q1"}417        req = test_utils.create_request(action, body, headers)418        def validator(resp, isBinary):419            resp = json.loads(resp.decode())420            self.assertEqual(200, resp['headers']['status'])421            self.assertEqual({"node": 31}, resp['body'])422        sender.side_effect = validator423        self.protocol.onMessage(req, False)424        # List tail425        action = consts.QUEUE_LIST426        body = {}427        req = test_utils.create_request(action, body, headers)428        def validator(resp, isBinary):429            resp = json.loads(resp.decode())430            self.assertEqual(200, resp['headers']['status'])431        sender.side_effect = validator432        self.protocol.onMessage(req, False)433        # List manually-constructed tail434        body = {'marker': "zzz"}435        req = test_utils.create_request(action, body, headers)436        self.protocol.onMessage(req, False)437    def test_list_returns_503_on_nopoolfound_exception(self):438        headers = {439            'Client-ID': uuidutils.generate_uuid(),440            'X-Project-ID': 'test-project'441        }442        action = consts.QUEUE_LIST443        body = {}444        send_mock = mock.patch.object(self.protocol, 'sendMessage')445        self.addCleanup(send_mock.stop)446        sender = send_mock.start()447        req = test_utils.create_request(action, body, headers)448        def validator(resp, isBinary):449            resp = json.loads(resp.decode())450            self.assertEqual(503, resp['headers']['status'])451        sender.side_effect = validator452        queue_controller = self.boot.storage.queue_controller453        with mock.patch.object(queue_controller, 'list') as mock_queue_list:454            def queue_generator():455                raise storage_errors.NoPoolFound()456            # This generator tries to be like queue controller list generator457            # in some ways.458            def fake_generator():459                yield queue_generator()460                yield {}461            mock_queue_list.return_value = fake_generator()462            self.protocol.onMessage(req, False)463    def _post_messages(self, queue_name, headers, repeat=1):464        messages = [{'body': 239, 'ttl': 300}] * repeat465        action = consts.MESSAGE_POST466        body = {"queue_name": queue_name,467                "messages": messages}468        send_mock = mock.Mock()469        self.protocol.sendMessage = send_mock470        req = test_utils.create_request(action, body, headers)471        self.protocol.onMessage(req, False)472        return json.loads(send_mock.call_args[0][0].decode())473    def test_purge(self):474        arbitrary_number = 644079696574693475        project_id = str(arbitrary_number)476        client_id = uuidutils.generate_uuid()477        headers = {478            'X-Project-ID': project_id,479            'Client-ID': client_id480        }481        queue_name = 'myqueue'482        resp = self._post_messages(queue_name, headers, repeat=5)483        msg_ids = resp['body']['message_ids']484        send_mock = mock.Mock()485        self.protocol.sendMessage = send_mock486        for msg_id in msg_ids:487            action = consts.MESSAGE_GET488            body = {"queue_name": queue_name, "message_id": msg_id}489            req = test_utils.create_request(action, body, headers)490            self.protocol.onMessage(req, False)491            resp = json.loads(send_mock.call_args[0][0].decode())492            self.assertEqual(200, resp['headers']['status'])493        action = consts.QUEUE_PURGE494        body = {"queue_name": queue_name, "resource_types": ["messages"]}495        req = test_utils.create_request(action, body, headers)496        self.protocol.onMessage(req, False)497        resp = json.loads(send_mock.call_args[0][0].decode())498        self.assertEqual(204, resp['headers']['status'])499        for msg_id in msg_ids:500            action = consts.MESSAGE_GET501            body = {"queue_name": queue_name, "message_id": msg_id}502            req = test_utils.create_request(action, body, headers)503            self.protocol.onMessage(req, False)504            resp = json.loads(send_mock.call_args[0][0].decode())505            self.assertEqual(404, resp['headers']['status'])506class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):507    config_file = 'websocket_mongodb.conf'508    @testing.requires_mongodb509    def setUp(self):510        super(TestQueueLifecycleMongoDB, self).setUp()511    def tearDown(self):512        storage = self.boot.storage._storage513        connection = storage.connection514        connection.drop_database(self.boot.control.queues_database)515        for db in storage.message_databases:516            connection.drop_database(db)...test_messages.py
Source:test_messages.py  
1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import datetime16import json17import ddt18import mock19from oslo_utils import timeutils20from oslo_utils import uuidutils21import six22from testtools import matchers23from zaqar.common import consts24from zaqar.tests.unit.transport.websocket import base25from zaqar.tests.unit.transport.websocket import utils as test_utils26from zaqar.transport import validation27@ddt.ddt28class MessagesBaseTest(base.V2Base):29    config_file = "websocket_mongodb.conf"30    def setUp(self):31        super(MessagesBaseTest, self).setUp()32        self.protocol = self.transport.factory()33        self.default_message_ttl = 360034        self.project_id = '7e55e1a7e'35        self.headers = {36            'Client-ID': uuidutils.generate_uuid(),37            'X-Project-ID': self.project_id38        }39        body = {"queue_name": "kitkat"}40        req = test_utils.create_request(consts.QUEUE_CREATE,41                                        body, self.headers)42        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:43            self.protocol.onMessage(req, False)44            resp = json.loads(msg_mock.call_args[0][0].decode())45            self.assertIn(resp['headers']['status'], [201, 204])46    def tearDown(self):47        super(MessagesBaseTest, self).tearDown()48        body = {"queue_name": "kitkat"}49        send_mock = mock.Mock()50        self.protocol.sendMessage = send_mock51        req = test_utils.create_request(consts.QUEUE_DELETE,52                                        body, self.headers)53        self.protocol.onMessage(req, False)54        resp = json.loads(send_mock.call_args[0][0].decode())55        self.assertEqual(204, resp['headers']['status'])56    def _test_post(self, sample_messages, in_binary=False):57        body = {"queue_name": "kitkat",58                "messages": sample_messages}59        send_mock = mock.Mock()60        self.protocol.sendMessage = send_mock61        dumps, loads, create_req = test_utils.get_pack_tools(binary=in_binary)62        req = create_req(consts.MESSAGE_POST, body, self.headers)63        self.protocol.onMessage(req, in_binary)64        arg = send_mock.call_args[0][0]65        if not in_binary:66            arg = arg.decode()67        resp = loads(arg)68        self.assertEqual(201, resp['headers']['status'])69        self.msg_ids = resp['body']['message_ids']70        self.assertEqual(len(sample_messages), len(self.msg_ids))71        lookup = dict([(m['ttl'], m['body']) for m in sample_messages])72        # Test GET on the message resource directly73        # NOTE(cpp-cabrera): force the passing of time to age a message74        timeutils_utcnow = 'oslo_utils.timeutils.utcnow'75        now = timeutils.utcnow() + datetime.timedelta(seconds=10)76        with mock.patch(timeutils_utcnow) as mock_utcnow:77            mock_utcnow.return_value = now78            for msg_id in self.msg_ids:79                headers = self.headers.copy()80                headers['X-Project-ID'] = '777777'81                # Wrong project ID82                action = consts.MESSAGE_GET83                body = {"queue_name": "kitkat",84                        "message_id": msg_id}85                req = create_req(action, body, headers)86                self.protocol.onMessage(req, in_binary)87                arg = send_mock.call_args[0][0]88                if not in_binary:89                    arg = arg.decode()90                resp = loads(arg)91                self.assertEqual(404, resp['headers']['status'])92                # Correct project ID93                req = create_req(action, body, self.headers)94                self.protocol.onMessage(req, in_binary)95                arg = send_mock.call_args[0][0]96                if not in_binary:97                    arg = arg.decode()98                resp = loads(arg)99                self.assertEqual(200, resp['headers']['status'])100                # Check message properties101                message = resp['body']['messages']102                self.assertEqual(lookup[message['ttl']], message['body'])103                self.assertEqual(msg_id, message['id'])104                # no negative age105                # NOTE(cpp-cabrera): testtools lacks106                # GreaterThanEqual on py26107                self.assertThat(message['age'],108                                matchers.GreaterThan(-1))109        # Test bulk GET110        action = consts.MESSAGE_GET_MANY111        body = {"queue_name": "kitkat",112                "message_ids": self.msg_ids}113        req = create_req(action, body, self.headers)114        self.protocol.onMessage(req, in_binary)115        arg = send_mock.call_args[0][0]116        if not in_binary:117            arg = arg.decode()118        resp = loads(arg)119        self.assertEqual(200, resp['headers']['status'])120        expected_ttls = set(m['ttl'] for m in sample_messages)121        actual_ttls = set(m['ttl'] for m in resp['body']['messages'])122        self.assertFalse(expected_ttls - actual_ttls)123        actual_ids = set(m['id'] for m in resp['body']['messages'])124        self.assertFalse(set(self.msg_ids) - actual_ids)125    def test_exceeded_payloads(self):126        # Get a valid message id127        resp = self._post_messages("kitkat")128        msg_id = resp['body']['message_ids']129        # Bulk GET restriction130        get_msg_ids = msg_id * 21131        action = consts.MESSAGE_GET_MANY132        body = {"queue_name": "kitkat",133                "message_ids": get_msg_ids}134        send_mock = mock.Mock()135        self.protocol.sendMessage = send_mock136        req = test_utils.create_request(action, body, self.headers)137        self.protocol.onMessage(req, False)138        resp = json.loads(send_mock.call_args[0][0].decode())139        self.assertEqual(400, resp['headers']['status'])140        # Listing restriction141        body['limit'] = 21142        req = test_utils.create_request(action, body, self.headers)143        self.protocol.onMessage(req, False)144        resp = json.loads(send_mock.call_args[0][0].decode())145        self.assertEqual(400, resp['headers']['status'])146        # Bulk deletion restriction147        del_msg_ids = msg_id * 22148        action = consts.MESSAGE_GET_MANY149        body = {"queue_name": "kitkat",150                "message_ids": del_msg_ids}151        req = test_utils.create_request(action, body, self.headers)152        self.protocol.onMessage(req, False)153        resp = json.loads(send_mock.call_args[0][0].decode())154        self.assertEqual(400, resp['headers']['status'])155    @ddt.data(True, False)156    def test_post_single(self, in_binary):157        sample_messages = [158            {'body': {'key': 'value'}, 'ttl': 200},159        ]160        self._test_post(sample_messages, in_binary=in_binary)161    @ddt.data(True, False)162    def test_post_multiple(self, in_binary):163        sample_messages = [164            {'body': 239, 'ttl': 100},165            {'body': {'key': 'value'}, 'ttl': 200},166            {'body': [1, 3], 'ttl': 300},167        ]168        self._test_post(sample_messages, in_binary=in_binary)169    def test_post_optional_ttl(self):170        messages = [{'body': 239},171                    {'body': {'key': 'value'}, 'ttl': 200}]172        action = consts.MESSAGE_POST173        body = {"queue_name": "kitkat",174                "messages": messages}175        req = test_utils.create_request(action, body, self.headers)176        send_mock = mock.Mock()177        self.protocol.sendMessage = send_mock178        self.protocol.onMessage(req, False)179        resp = json.loads(send_mock.call_args[0][0].decode())180        self.assertEqual(201, resp['headers']['status'])181        msg_id = resp['body']['message_ids'][0]182        action = consts.MESSAGE_GET183        body = {"queue_name": "kitkat", "message_id": msg_id}184        req = test_utils.create_request(action, body, self.headers)185        self.protocol.onMessage(req, False)186        resp = json.loads(send_mock.call_args[0][0].decode())187        self.assertEqual(200, resp['headers']['status'])188        self.assertEqual(self.default_message_ttl,189                         resp['body']['messages']['ttl'])190    def test_post_to_non_ascii_queue(self):191        queue_name = u'non-ascii-n\u0153me'192        if six.PY2:193            queue_name = queue_name.encode('utf-8')194        resp = self._post_messages(queue_name)195        self.assertEqual(400, resp['headers']['status'])196    def test_post_with_long_queue_name(self):197        # NOTE(kgriffs): This test verifies that routes with198        # embedded queue name params go through the validation199        # hook, regardless of the target resource.200        queue_name = 'v' * validation.QUEUE_NAME_MAX_LEN201        resp = self._post_messages(queue_name)202        self.assertEqual(201, resp['headers']['status'])203        queue_name += 'v'204        resp = self._post_messages(queue_name)205        self.assertEqual(400, resp['headers']['status'])206    def test_post_to_missing_queue(self):207        queue_name = 'nonexistent'208        resp = self._post_messages(queue_name)209        self.assertEqual(201, resp['headers']['status'])210    def test_post_invalid_ttl(self):211        sample_messages = [212            {'body': {'key': 'value'}, 'ttl': '200'},213        ]214        action = consts.MESSAGE_POST215        body = {"queue_name": "kitkat",216                "messages": sample_messages}217        send_mock = mock.patch.object(self.protocol, 'sendMessage')218        self.addCleanup(send_mock.stop)219        send_mock = send_mock.start()220        req = test_utils.create_request(action, body, self.headers)221        self.protocol.onMessage(req, False)222        resp = json.loads(send_mock.call_args[0][0].decode())223        self.assertEqual(400, resp['headers']['status'])224        self.assertEqual(225            'Bad request. The value of the "ttl" field must be a int.',226            resp['body']['exception'])227    def test_post_no_body(self):228        sample_messages = [229            {'ttl': 200},230        ]231        action = consts.MESSAGE_POST232        body = {"queue_name": "kitkat",233                "messages": sample_messages}234        send_mock = mock.patch.object(self.protocol, 'sendMessage')235        self.addCleanup(send_mock.stop)236        send_mock = send_mock.start()237        req = test_utils.create_request(action, body, self.headers)238        self.protocol.onMessage(req, False)239        resp = json.loads(send_mock.call_args[0][0].decode())240        self.assertEqual(400, resp['headers']['status'])241        self.assertEqual(242            'Bad request. Missing "body" field.', resp['body']['exception'])243    def test_get_from_missing_queue(self):244        action = consts.MESSAGE_LIST245        body = {"queue_name": "anothernonexistent"}246        req = test_utils.create_request(action, body, self.headers)247        send_mock = mock.Mock()248        self.protocol.sendMessage = send_mock249        self.protocol.onMessage(req, False)250        resp = json.loads(send_mock.call_args[0][0].decode())251        self.assertEqual(200, resp['headers']['status'])252        self.assertEqual([], resp['body']['messages'])253    @ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')254    def test_bad_client_id(self, text_id):255        action = consts.MESSAGE_POST256        body = {257            "queue_name": "kinder",258            "messages": [{"ttl": 60,259                          "body": ""}]260        }261        headers = {262            'Client-ID': text_id,263            'X-Project-ID': self.project_id264        }265        send_mock = mock.Mock()266        self.protocol.sendMessage = send_mock267        req = test_utils.create_request(action, body, headers)268        self.protocol.onMessage(req, False)269        resp = json.loads(send_mock.call_args[0][0].decode())270        self.assertEqual(400, resp['headers']['status'])271        action = consts.MESSAGE_GET272        body = {273            "queue_name": "kinder",274            "limit": 3,275            "echo": True276        }277        req = test_utils.create_request(action, body, headers)278        self.protocol.onMessage(req, False)279        resp = json.loads(send_mock.call_args[0][0].decode())280        self.assertEqual(400, resp['headers']['status'])281    @ddt.data(None, '[', '[]', '{}', '.')282    def test_post_bad_message(self, document):283        action = consts.MESSAGE_POST284        body = {285            "queue_name": "kinder",286            "messages": document287        }288        send_mock = mock.Mock()289        self.protocol.sendMessage = send_mock290        req = test_utils.create_request(action, body, self.headers)291        self.protocol.onMessage(req, False)292        resp = json.loads(send_mock.call_args[0][0].decode())293        self.assertEqual(400, resp['headers']['status'])294    @ddt.data(-1, 59, 1209601)295    def test_unacceptable_ttl(self, ttl):296        action = consts.MESSAGE_POST297        body = {"queue_name": "kinder",298                "messages": [{"ttl": ttl, "body": ""}]}299        send_mock = mock.Mock()300        self.protocol.sendMessage = send_mock301        req = test_utils.create_request(action, body, self.headers)302        self.protocol.onMessage(req, False)303        resp = json.loads(send_mock.call_args[0][0].decode())304        self.assertEqual(400, resp['headers']['status'])305    def test_exceeded_message_posting(self):306        # Total (raw request) size307        document = [{'body': "some body", 'ttl': 100}] * 8000308        action = consts.MESSAGE_POST309        body = {310            "queue_name": "kinder",311            "messages": document312        }313        send_mock = mock.Mock()314        self.protocol.sendMessage = send_mock315        req = test_utils.create_request(action, body, self.headers)316        self.protocol.onMessage(req, False)317        resp = json.loads(send_mock.call_args[0][0].decode())318        self.assertEqual(400, resp['headers']['status'])319    @ddt.data('{"overflow": 9223372036854775808}',320              '{"underflow": -9223372036854775809}')321    def test_unsupported_json(self, document):322        action = consts.MESSAGE_POST323        body = {324            "queue_name": "fizz",325            "messages": document326        }327        send_mock = mock.Mock()328        self.protocol.sendMessage = send_mock329        req = test_utils.create_request(action, body, self.headers)330        self.protocol.onMessage(req, False)331        resp = json.loads(send_mock.call_args[0][0].decode())332        self.assertEqual(400, resp['headers']['status'])333    def test_delete(self):334        resp = self._post_messages("tofi")335        msg_id = resp['body']['message_ids'][0]336        action = consts.MESSAGE_GET337        body = {"queue_name": "tofi",338                "message_id": msg_id}339        send_mock = mock.Mock()340        self.protocol.sendMessage = send_mock341        req = test_utils.create_request(action, body, self.headers)342        self.protocol.onMessage(req, False)343        resp = json.loads(send_mock.call_args[0][0].decode())344        self.assertEqual(200, resp['headers']['status'])345        # Delete queue346        action = consts.MESSAGE_DELETE347        req = test_utils.create_request(action, body, self.headers)348        self.protocol.onMessage(req, False)349        resp = json.loads(send_mock.call_args[0][0].decode())350        self.assertEqual(204, resp['headers']['status'])351        # Get non existent queue352        action = consts.MESSAGE_GET353        req = test_utils.create_request(action, body, self.headers)354        self.protocol.onMessage(req, False)355        resp = json.loads(send_mock.call_args[0][0].decode())356        self.assertEqual(404, resp['headers']['status'])357        # Safe to delete non-existing ones358        action = consts.MESSAGE_DELETE359        req = test_utils.create_request(action, body, self.headers)360        self.protocol.onMessage(req, False)361        resp = json.loads(send_mock.call_args[0][0].decode())362        self.assertEqual(204, resp['headers']['status'])363    def test_bulk_delete(self):364        resp = self._post_messages("nerds", repeat=5)365        msg_ids = resp['body']['message_ids']366        action = consts.MESSAGE_DELETE_MANY367        body = {"queue_name": "nerds",368                "message_ids": msg_ids}369        send_mock = mock.Mock()370        self.protocol.sendMessage = send_mock371        req = test_utils.create_request(action, body, self.headers)372        self.protocol.onMessage(req, False)373        resp = json.loads(send_mock.call_args[0][0].decode())374        self.assertEqual(204, resp['headers']['status'])375        action = consts.MESSAGE_GET376        req = test_utils.create_request(action, body, self.headers)377        self.protocol.onMessage(req, False)378        resp = json.loads(send_mock.call_args[0][0].decode())379        self.assertEqual(400, resp['headers']['status'])380        # Safe to delete non-existing ones381        action = consts.MESSAGE_DELETE_MANY382        req = test_utils.create_request(action, body, self.headers)383        self.protocol.onMessage(req, False)384        resp = json.loads(send_mock.call_args[0][0].decode())385        self.assertEqual(204, resp['headers']['status'])386        # Even after the queue is gone387        action = consts.QUEUE_DELETE388        body = {"queue_name": "nerds"}389        req = test_utils.create_request(action, body, self.headers)390        self.protocol.onMessage(req, False)391        resp = json.loads(send_mock.call_args[0][0].decode())392        self.assertEqual(204, resp['headers']['status'])393        action = consts.MESSAGE_DELETE_MANY394        body = {"queue_name": "nerds",395                "message_ids": msg_ids}396        req = test_utils.create_request(action, body, self.headers)397        self.protocol.onMessage(req, False)398        resp = json.loads(send_mock.call_args[0][0].decode())399        self.assertEqual(204, resp['headers']['status'])400    def test_pop_delete(self):401        self._post_messages("kitkat", repeat=5)402        action = consts.MESSAGE_DELETE_MANY403        body = {"queue_name": "kitkat", "pop": 2}404        send_mock = mock.Mock()405        self.protocol.sendMessage = send_mock406        req = test_utils.create_request(action, body, self.headers)407        self.protocol.onMessage(req, False)408        resp = json.loads(send_mock.call_args[0][0].decode())409        self.assertEqual(200, resp['headers']['status'])410        self.assertEqual(2, len(resp['body']['messages']))411        self.assertEqual(239, resp['body']['messages'][0]['body'])412        self.assertEqual(239, resp['body']['messages'][1]['body'])413    def test_get_nonexistent_message_404s(self):414        action = consts.MESSAGE_GET415        body = {"queue_name": "notthere",416                "message_id": "a"}417        send_mock = mock.Mock()418        self.protocol.sendMessage = send_mock419        req = test_utils.create_request(action, body, self.headers)420        self.protocol.onMessage(req, False)421        resp = json.loads(send_mock.call_args[0][0].decode())422        self.assertEqual(404, resp['headers']['status'])423    def test_get_multiple_invalid_messages_404s(self):424        action = consts.MESSAGE_GET_MANY425        body = {"queue_name": "notnotthere",426                "message_ids": ["a", "b", "c"]}427        send_mock = mock.Mock()428        self.protocol.sendMessage = send_mock429        req = test_utils.create_request(action, body, self.headers)430        self.protocol.onMessage(req, False)431        resp = json.loads(send_mock.call_args[0][0].decode())432        self.assertEqual(200, resp['headers']['status'])433    def test_delete_multiple_invalid_messages_204s(self):434        action = consts.MESSAGE_DELETE435        body = {"queue_name": "yetanothernotthere",436                "message_ids": ["a", "b", "c"]}437        send_mock = mock.Mock()438        self.protocol.sendMessage = send_mock439        req = test_utils.create_request(action, body, self.headers)440        self.protocol.onMessage(req, False)441        resp = json.loads(send_mock.call_args[0][0].decode())442        self.assertEqual(400, resp['headers']['status'])443    def _post_messages(self, queue_name, repeat=1):444        messages = [{'body': 239, 'ttl': 300}] * repeat445        action = consts.MESSAGE_POST446        body = {"queue_name": queue_name,447                "messages": messages}448        send_mock = mock.Mock()449        self.protocol.sendMessage = send_mock450        req = test_utils.create_request(action, body, self.headers)451        self.protocol.onMessage(req, False)452        return json.loads(send_mock.call_args[0][0].decode())453    def test_invalid_request(self):454        send_mock = mock.Mock()455        self.protocol.sendMessage = send_mock456        self.protocol.onMessage('foo', False)457        self.assertEqual(1, send_mock.call_count)458        response = json.loads(send_mock.call_args[0][0].decode())459        self.assertIn('error', response['body'])460        self.assertEqual({'status': 400}, response['headers'])461        self.assertEqual(462            {'action': None, 'api': 'v2', 'body': {}, 'headers': {}},...bybit.py
Source:bybit.py  
...63            elif resp.url.path == "/v2/public/kline/list":64                self.kline._onresponse(data["result"])65            elif resp.url.path == "/v2/private/wallet/balance":66                self.wallet._onresponse(data["result"])67    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:68        if "success" in msg:69            if not msg["success"]:70                logger.warning(msg)71        if "topic" in msg:72            topic: str = msg["topic"]73            data = msg["data"]74            if any(75                [76                    topic.startswith("orderBookL2_25"),77                    topic.startswith("orderBook_200"),78                ]79            ):80                self.orderbook._onmessage(topic, msg["type"], data)81            elif topic.startswith("trade"):82                self.trade._onmessage(data)83            elif topic.startswith("insurance"):84                self.insurance._onmessage(data)85            elif topic.startswith("instrument_info"):86                self.instrument._onmessage(topic, msg["type"], data)87            if topic.startswith("klineV2"):88                self.kline._onmessage(topic, data)89            elif topic.startswith("liquidation"):90                self.liquidation._onmessage(data)91            elif topic == "position":92                self.position._onmessage(data)93            elif topic == "execution":94                self.execution._onmessage(data)95            elif topic == "order":96                self.order._onmessage(data)97            elif topic == "stop_order":98                self.stoporder._onmessage(data)99            elif topic == "wallet":100                self.wallet._onmessage(data)101        if "timestamp_e6" in msg:102            self.timestamp_e6 = int(msg["timestamp_e6"])103    @property104    def orderbook(self) -> "OrderBookInverse":105        return self.get("orderbook", OrderBookInverse)106    @property107    def trade(self) -> "TradeInverse":108        return self.get("trade", TradeInverse)109    @property110    def insurance(self) -> "Insurance":111        return self.get("insurance", Insurance)112    @property113    def instrument(self) -> "InstrumentInverse":114        return self.get("instrument", InstrumentInverse)115    @property116    def kline(self) -> "KlineInverse":117        return self.get("kline", KlineInverse)118    @property119    def liquidation(self) -> "LiquidationInverse":120        return self.get("liquidation", LiquidationInverse)121    @property122    def position(self) -> "PositionInverse":123        """124        ã¤ã³ãã¼ã¹å¥ç´(ç¡æé/å
ç©)ç¨ã®ãã¸ã·ã§ã³125        """126        return self.get("position", PositionInverse)127    @property128    def execution(self) -> "ExecutionInverse":129        return self.get("execution", ExecutionInverse)130    @property131    def order(self) -> "OrderInverse":132        """133        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)134        """135        return self.get("order", OrderInverse)136    @property137    def stoporder(self) -> "StopOrderInverse":138        """139        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)140        """141        return self.get("stoporder", StopOrderInverse)142    @property143    def wallet(self) -> "WalletInverse":144        return self.get("wallet", WalletInverse)145class BybitUSDTDataStore(DataStoreManager):146    """147    Bybit USDTå¥ç´ã®ãã¼ã¿ã¹ãã¢ããã¼ã¸ã£ã¼148    """149    def _init(self) -> None:150        self.create("orderbook", datastore_class=OrderBookUSDT)151        self.create("trade", datastore_class=TradeUSDT)152        self.create("insurance", datastore_class=Insurance)153        self.create("instrument", datastore_class=InstrumentUSDT)154        self.create("kline", datastore_class=KlineUSDT)155        self.create("liquidation", datastore_class=LiquidationUSDT)156        self.create("position", datastore_class=PositionUSDT)157        self.create("execution", datastore_class=ExecutionUSDT)158        self.create("order", datastore_class=OrderUSDT)159        self.create("stoporder", datastore_class=StopOrderUSDT)160        self.create("wallet", datastore_class=WalletUSDT)161        self.timestamp_e6: Optional[int] = None162    async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:163        """164        対å¿ã¨ã³ããã¤ã³ã165        - GET /private/linear/order/search (DataStore: order)166        - GET /private/linear/stop-order/search (DataStore: stoporder)167        - GET /private/linear/position/list (DataStore: position)168        - GET /private/linear/position/list (DataStore: position)169        - GET /public/linear/kline (DataStore: kline)170        - GET /v2/private/wallet/balance (DataStore: wallet)171        """172        for f in asyncio.as_completed(aws):173            resp = await f174            data = await resp.json()175            if data["ret_code"] != 0:176                raise ValueError(177                    "Response error at DataStore initialization\n"178                    f"URL: {resp.url}\n"179                    f"Data: {data}"180                )181            if resp.url.path == "/private/linear/order/search":182                self.order._onresponse(data["result"])183            elif resp.url.path == "/private/linear/stop-order/search":184                self.stoporder._onresponse(data["result"])185            elif resp.url.path == "/private/linear/position/list":186                self.position._onresponse(data["result"])187            elif resp.url.path == "/public/linear/kline":188                self.kline._onresponse(data["result"])189            elif resp.url.path == "/v2/private/wallet/balance":190                self.wallet._onresponse(data["result"])191    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:192        if "success" in msg:193            if not msg["success"]:194                logger.warning(msg)195        if "topic" in msg:196            topic: str = msg["topic"]197            data = msg["data"]198            if any(199                [200                    topic.startswith("orderBookL2_25"),201                    topic.startswith("orderBook_200"),202                ]203            ):204                self.orderbook._onmessage(topic, msg["type"], data)205            elif topic.startswith("trade"):206                self.trade._onmessage(data)207            elif topic.startswith("instrument_info"):208                self.instrument._onmessage(topic, msg["type"], data)209            if topic.startswith("candle"):210                self.kline._onmessage(topic, data)211            elif topic.startswith("liquidation"):212                self.liquidation._onmessage(data)213            elif topic == "position":214                self.position._onmessage(data)215            elif topic == "execution":216                self.execution._onmessage(data)217            elif topic == "order":218                self.order._onmessage(data)219            elif topic == "stop_order":220                self.stoporder._onmessage(data)221            elif topic == "wallet":222                self.wallet._onmessage(data)223        if "timestamp_e6" in msg:224            self.timestamp_e6 = int(msg["timestamp_e6"])225    @property226    def orderbook(self) -> "OrderBookUSDT":227        return self.get("orderbook", OrderBookUSDT)228    @property229    def trade(self) -> "TradeUSDT":230        return self.get("trade", TradeUSDT)231    @property232    def instrument(self) -> "InstrumentUSDT":233        return self.get("instrument", InstrumentUSDT)234    @property235    def kline(self) -> "KlineUSDT":236        return self.get("kline", KlineUSDT)237    @property238    def liquidation(self) -> "LiquidationUSDT":239        return self.get("liquidation", LiquidationUSDT)240    @property241    def position(self) -> "PositionUSDT":242        """243        USDTå¥ç´ç¨ã®ãã¸ã·ã§ã³244        """245        return self.get("position", PositionUSDT)246    @property247    def execution(self) -> "ExecutionUSDT":248        return self.get("execution", ExecutionUSDT)249    @property250    def order(self) -> "OrderUSDT":251        """252        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)253        """254        return self.get("order", OrderUSDT)255    @property256    def stoporder(self) -> "StopOrderUSDT":257        """258        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)259        """260        return self.get("stoporder", StopOrderUSDT)261    @property262    def wallet(self) -> "WalletUSDT":263        return self.get("wallet", WalletUSDT)264class OrderBookInverse(DataStore):265    _KEYS = ["symbol", "id", "side"]266    def sorted(self, query: Optional[Item] = None) -> dict[str, list[Item]]:267        if query is None:268            query = {}269        result = {"Sell": [], "Buy": []}270        for item in self:271            if all(k in item and query[k] == item[k] for k in query):272                result[item["side"]].append(item)273        result["Sell"].sort(key=lambda x: x["id"])274        result["Buy"].sort(key=lambda x: x["id"], reverse=True)275        return result276    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:277        if type_ == "snapshot":278            symbol = topic.split(".")[-1]279            # ex: "orderBookL2_25.BTCUSD", "orderBook_200.100ms.BTCUSD"280            result = self.find({"symbol": symbol})281            self._delete(result)282            self._insert(data)283        elif type_ == "delta":284            self._delete(data["delete"])285            self._update(data["update"])286            self._insert(data["insert"])287class OrderBookUSDT(OrderBookInverse):288    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:289        if type_ == "snapshot":290            symbol = topic.split(".")[-1]291            # ex: "orderBookL2_25.BTCUSDT", "orderBook_200.100ms.BTCUSDT"292            result = self.find({"symbol": symbol})293            self._delete(result)294            self._insert(data["order_book"])295        elif type_ == "delta":296            self._delete(data["delete"])297            self._update(data["update"])298            self._insert(data["insert"])299class TradeInverse(DataStore):300    _KEYS = ["trade_id"]301    _MAXLEN = 99999302    def _onmessage(self, data: list[Item]) -> None:303        self._insert(data)304class TradeUSDT(TradeInverse):305    ...306class Insurance(DataStore):307    _KEYS = ["currency"]308    def _onmessage(self, data: list[Item]) -> None:309        self._update(data)310class InstrumentInverse(DataStore):311    _KEYS = ["symbol"]312    def _onmessage(self, topic: str, type_: str, data: Item) -> None:313        if type_ == "snapshot":314            symbol = topic.split(".")[-1]  # ex: "instrument_info.100ms.BTCUSD"315            result = self.find({"symbol": symbol})316            self._delete(result)317            self._insert([data])318        elif type_ == "delta":319            self._update(data["update"])320class InstrumentUSDT(InstrumentInverse):321    ...322class KlineInverse(DataStore):323    _KEYS = ["start", "symbol", "interval"]324    def _onmessage(self, topic: str, data: list[Item]) -> None:325        topic_split = topic.split(".")  # ex:"klineV2.1.BTCUSD"326        for item in data:327            item["symbol"] = topic_split[-1]328            item["interval"] = topic_split[-2]329        self._update(data)330    def _onresponse(self, data: list[Item]) -> None:331        for item in data:332            item["start"] = item.pop("open_time")333        self._update(data)334class KlineUSDT(KlineInverse):335    ...336class LiquidationInverse(DataStore):337    _MAXLEN = 99999338    def _onmessage(self, item: Item) -> None:339        self._insert([item])340class LiquidationUSDT(LiquidationInverse):341    ...342class PositionInverse(DataStore):343    _KEYS = ["symbol", "position_idx"]344    def one(self, symbol: str) -> Optional[Item]:345        return self.get({"symbol": symbol, "position_idx": 0})346    def both(self, symbol: str) -> dict[str, Optional[Item]]:347        return {348            "Sell": self.get({"symbol": symbol, "position_idx": 2}),349            "Buy": self.get({"symbol": symbol, "position_idx": 1}),350        }351    def _onresponse(self, data: Union[Item, list[Item]]) -> None:352        if isinstance(data, dict):353            self._update([data])  # ex: {"symbol": "BTCUSD", ...}354        elif isinstance(data, list):355            for item in data:356                if "is_valid" in item:357                    if item["is_valid"]:358                        self._update([item["data"]])359                        # ex:360                        # [361                        #     {362                        #         "is_valid": True,363                        #         "data": {"symbol": "BTCUSDM21", ...}364                        #     },365                        #     ...366                        # ]367                else:368                    self._update([item])369                    # ex: [{"symbol": "BTCUSDT", ...}, ...]370    def _onmessage(self, data: list[Item]) -> None:371        self._update(data)372class PositionUSDT(PositionInverse):373    def _onmessage(self, data: list[Item]) -> None:374        for item in data:375            item["position_idx"] = int(item["position_idx"])376            self._update([item])377class ExecutionInverse(DataStore):378    _KEYS = ["exec_id"]379    def _onmessage(self, data: list[Item]) -> None:380        self._update(data)381class ExecutionUSDT(ExecutionInverse):382    ...383class OrderInverse(DataStore):384    _KEYS = ["order_id"]385    def _onresponse(self, data: list[Item]) -> None:386        if isinstance(data, list):387            self._update(data)388        elif isinstance(data, dict):389            self._update([data])390    def _onmessage(self, data: list[Item]) -> None:391        for item in data:392            if item["order_status"] in ("Created", "New", "PartiallyFilled"):393                self._update([item])394            else:395                self._delete([item])396class OrderUSDT(OrderInverse):397    ...398class StopOrderInverse(DataStore):399    _KEYS = ["order_id"]400    def _onresponse(self, data: list[Item]) -> None:401        if isinstance(data, list):402            self._update(data)403        elif isinstance(data, dict):404            self._update([data])405    def _onmessage(self, data: list[Item]) -> None:406        for item in data:407            if item["order_status"] in ("Active", "Untriggered"):408                self._update([item])409            else:410                self._delete([item])411class StopOrderUSDT(StopOrderInverse):412    _KEYS = ["stop_order_id"]413class WalletInverse(DataStore):414    _KEYS = ["coin"]415    def _onresponse(self, data: dict[str, Item]) -> None:416        data.pop("USDT", None)417        for coin in data:418            self._update(419                [420                    {421                        "coin": coin,422                        "available_balance": data[coin]["available_balance"],423                        "wallet_balance": data[coin]["wallet_balance"],424                    }425                ]426            )427    def _onmessage(self, data: list[Item]) -> None:428        self._update(data)429class WalletUSDT(WalletInverse):430    def _onresponse(self, data: dict[str, Item]) -> None:431        if "USDT" in data:432            self._update(433                [434                    {435                        "coin": "USDT",436                        "wallet_balance": data["USDT"]["wallet_balance"],437                        "available_balance": data["USDT"]["available_balance"],438                    }439                ]440            )441    def _onmessage(self, data: list[Item]) -> None:442        for item in data:...test_claims.py
Source:test_claims.py  
1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may not4# use this file except in compliance with the License.  You may obtain a copy5# of the License at6#7#    http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations under13# the License.14import json15import ddt16import mock17from oslo_utils import timeutils18from oslo_utils import uuidutils19from zaqar.common import consts20from zaqar.tests.unit.transport.websocket import base21from zaqar.tests.unit.transport.websocket import utils as test_utils22@ddt.ddt23class ClaimsBaseTest(base.V1_1Base):24    config_file = "websocket_mongodb.conf"25    def setUp(self):26        super(ClaimsBaseTest, self).setUp()27        self.protocol = self.transport.factory()28        self.defaults = self.api.get_defaults()29        self.project_id = '7e55e1a7e'30        self.headers = {31            'Client-ID': uuidutils.generate_uuid(),32            'X-Project-ID': self.project_id33        }34        action = consts.QUEUE_CREATE35        body = {"queue_name": "skittle"}36        req = test_utils.create_request(action, body, self.headers)37        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:38            self.protocol.onMessage(req, False)39            resp = json.loads(msg_mock.call_args[0][0].decode())40            self.assertIn(resp['headers']['status'], [201, 204])41        action = consts.MESSAGE_POST42        body = {"queue_name": "skittle",43                "messages": [44                    {'body': 239, 'ttl': 300},45                    {'body': {'key_1': 'value_1'}, 'ttl': 300},46                    {'body': [1, 3], 'ttl': 300},47                    {'body': 439, 'ttl': 300},48                    {'body': {'key_2': 'value_2'}, 'ttl': 300},49                    {'body': ['a', 'b'], 'ttl': 300},50                    {'body': 639, 'ttl': 300},51                    {'body': {'key_3': 'value_3'}, 'ttl': 300},52                    {'body': ["aa", "bb"], 'ttl': 300}]53                }54        send_mock = mock.Mock()55        self.protocol.sendMessage = send_mock56        req = test_utils.create_request(action, body, self.headers)57        self.protocol.onMessage(req, False)58        resp = json.loads(send_mock.call_args[0][0].decode())59        self.assertEqual(201, resp['headers']['status'])60    def tearDown(self):61        super(ClaimsBaseTest, self).tearDown()62        action = consts.QUEUE_DELETE63        body = {'queue_name': 'skittle'}64        send_mock = mock.Mock()65        self.protocol.sendMessage = send_mock66        req = test_utils.create_request(action, body, self.headers)67        self.protocol.onMessage(req, False)68        resp = json.loads(send_mock.call_args[0][0].decode())69        self.assertEqual(204, resp['headers']['status'])70    @ddt.data('[', '[]', '.', '"fail"')71    def test_bad_claim(self, doc):72        action = consts.CLAIM_CREATE73        body = doc74        send_mock = mock.Mock()75        self.protocol.sendMessage = send_mock76        req = test_utils.create_request(action, body, self.headers)77        self.protocol.onMessage(req, False)78        resp = json.loads(send_mock.call_args[0][0].decode())79        self.assertEqual(400, resp['headers']['status'])80        action = consts.CLAIM_UPDATE81        body = doc82        req = test_utils.create_request(action, body, self.headers)83        self.protocol.onMessage(req, False)84        resp = json.loads(send_mock.call_args[0][0].decode())85        self.assertEqual(400, resp['headers']['status'])86    def test_exceeded_claim(self):87        action = consts.CLAIM_CREATE88        body = {"queue_name": "skittle",89                "ttl": 100,90                "grace": 60,91                "limit": 21}92        send_mock = mock.Mock()93        self.protocol.sendMessage = send_mock94        req = test_utils.create_request(action, body, self.headers)95        self.protocol.onMessage(req, False)96        resp = json.loads(send_mock.call_args[0][0].decode())97        self.assertEqual(400, resp['headers']['status'])98    @ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))99    def test_unacceptable_ttl_or_grace(self, ttl_grace):100        ttl, grace = ttl_grace101        action = consts.CLAIM_CREATE102        body = {"queue_name": "skittle",103                "ttl": ttl,104                "grace": grace}105        send_mock = mock.Mock()106        self.protocol.sendMessage = send_mock107        req = test_utils.create_request(action, body, self.headers)108        self.protocol.onMessage(req, False)109        resp = json.loads(send_mock.call_args[0][0].decode())110        self.assertEqual(400, resp['headers']['status'])111    @ddt.data(-1, 59, 43201)112    def test_unacceptable_new_ttl(self, ttl):113        claim = self._get_a_claim()114        action = consts.CLAIM_UPDATE115        body = {"queue_name": "skittle",116                "claim_id": claim['body']['claim_id'],117                "ttl": ttl}118        send_mock = mock.Mock()119        self.protocol.sendMessage = send_mock120        req = test_utils.create_request(action, body, self.headers)121        self.protocol.onMessage(req, False)122        resp = json.loads(send_mock.call_args[0][0].decode())123        self.assertEqual(400, resp['headers']['status'])124    def test_default_ttl_and_grace(self):125        action = consts.CLAIM_CREATE126        body = {"queue_name": "skittle"}127        send_mock = mock.Mock()128        self.protocol.sendMessage = send_mock129        req = test_utils.create_request(action, body, self.headers)130        self.protocol.onMessage(req, False)131        resp = json.loads(send_mock.call_args[0][0].decode())132        self.assertEqual(201, resp['headers']['status'])133        action = consts.CLAIM_GET134        body = {"queue_name": "skittle",135                "claim_id": resp['body']['claim_id']}136        req = test_utils.create_request(action, body, self.headers)137        self.protocol.onMessage(req, False)138        resp = json.loads(send_mock.call_args[0][0].decode())139        self.assertEqual(200, resp['headers']['status'])140        self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])141    def test_lifecycle(self):142        # First, claim some messages143        action = consts.CLAIM_CREATE144        body = {"queue_name": "skittle",145                "ttl": 100,146                "grace": 60}147        send_mock = mock.Mock()148        self.protocol.sendMessage = send_mock149        req = test_utils.create_request(action, body, self.headers)150        self.protocol.onMessage(req, False)151        resp = json.loads(send_mock.call_args[0][0].decode())152        self.assertEqual(201, resp['headers']['status'])153        claimed_messages = resp['body']['messages']154        claim_id = resp['body']['claim_id']155        # No more messages to claim156        body = {"queue_name": "skittle",157                "ttl": 100,158                "grace": 60}159        req = test_utils.create_request(action, body, self.headers)160        self.protocol.onMessage(req, False)161        resp = json.loads(send_mock.call_args[0][0].decode())162        self.assertEqual(204, resp['headers']['status'])163        # Listing messages, by default, won't include claimed, will echo164        action = consts.MESSAGE_LIST165        body = {"queue_name": "skittle",166                "echo": True}167        req = test_utils.create_request(action, body, self.headers)168        self.protocol.onMessage(req, False)169        resp = json.loads(send_mock.call_args[0][0].decode())170        self.assertEqual(200, resp['headers']['status'])171        self.assertEqual([], resp['body']['messages'])172        # Listing messages, by default, won't include claimed, won't echo173        body = {"queue_name": "skittle",174                "echo": False}175        req = test_utils.create_request(action, body, self.headers)176        self.protocol.onMessage(req, False)177        resp = json.loads(send_mock.call_args[0][0].decode())178        self.assertEqual(200, resp['headers']['status'])179        self.assertEqual([], resp['body']['messages'])180        # List messages, include_claimed, but don't echo181        body = {"queue_name": "skittle",182                "include_claimed": True,183                "echo": False}184        req = test_utils.create_request(action, body, self.headers)185        self.protocol.onMessage(req, False)186        resp = json.loads(send_mock.call_args[0][0].decode())187        self.assertEqual(200, resp['headers']['status'])188        self.assertEqual(resp['body']['messages'], [])189        # List messages with a different client-id and echo=false.190        # Should return some messages191        body = {"queue_name": "skittle",192                "echo": False}193        headers = {194            'Client-ID': uuidutils.generate_uuid(),195            'X-Project-ID': self.project_id196        }197        req = test_utils.create_request(action, body, headers)198        self.protocol.onMessage(req, False)199        resp = json.loads(send_mock.call_args[0][0].decode())200        self.assertEqual(200, resp['headers']['status'])201        # Include claimed messages this time, and echo202        body = {"queue_name": "skittle",203                "include_claimed": True,204                "echo": True}205        req = test_utils.create_request(action, body, self.headers)206        self.protocol.onMessage(req, False)207        resp = json.loads(send_mock.call_args[0][0].decode())208        self.assertEqual(200, resp['headers']['status'])209        self.assertEqual(len(claimed_messages), len(resp['body']['messages']))210        message_id_1 = resp['body']['messages'][0]['id']211        message_id_2 = resp['body']['messages'][1]['id']212        # Try to delete the message without submitting a claim_id213        action = consts.MESSAGE_DELETE214        body = {"queue_name": "skittle",215                "message_id": message_id_1}216        req = test_utils.create_request(action, body, self.headers)217        self.protocol.onMessage(req, False)218        resp = json.loads(send_mock.call_args[0][0].decode())219        self.assertEqual(403,  resp['headers']['status'])220        # Delete the message and its associated claim221        body = {"queue_name": "skittle",222                "message_id": message_id_1,223                "claim_id": claim_id}224        req = test_utils.create_request(action, body, self.headers)225        self.protocol.onMessage(req, False)226        resp = json.loads(send_mock.call_args[0][0].decode())227        self.assertEqual(204, resp['headers']['status'])228        # Try to get it from the wrong project229        headers = {230            'Client-ID': uuidutils.generate_uuid(),231            'X-Project-ID': 'someproject'232        }233        action = consts.MESSAGE_GET234        body = {"queue_name": "skittle",235                "message_id": message_id_2}236        req = test_utils.create_request(action, body, headers)237        self.protocol.onMessage(req, False)238        resp = json.loads(send_mock.call_args[0][0].decode())239        self.assertEqual(404,  resp['headers']['status'])240        # Get the message241        action = consts.MESSAGE_GET242        body = {"queue_name": "skittle",243                "message_id": message_id_2}244        req = test_utils.create_request(action, body, self.headers)245        self.protocol.onMessage(req, False)246        resp = json.loads(send_mock.call_args[0][0].decode())247        self.assertEqual(200, resp['headers']['status'])248        # Update the claim249        creation = timeutils.utcnow()250        action = consts.CLAIM_UPDATE251        body = {"queue_name": "skittle",252                "ttl": 60,253                "grace": 60,254                "claim_id": claim_id}255        req = test_utils.create_request(action, body, self.headers)256        self.protocol.onMessage(req, False)257        resp = json.loads(send_mock.call_args[0][0].decode())258        self.assertEqual(204, resp['headers']['status'])259        # Get the claimed messages (again)260        action = consts.CLAIM_GET261        body = {"queue_name": "skittle",262                "claim_id": claim_id}263        req = test_utils.create_request(action, body, self.headers)264        self.protocol.onMessage(req, False)265        query = timeutils.utcnow()266        resp = json.loads(send_mock.call_args[0][0].decode())267        self.assertEqual(200, resp['headers']['status'])268        self.assertEqual(60, resp['body']['ttl'])269        message_id_3 = resp['body']['messages'][0]['id']270        estimated_age = timeutils.delta_seconds(creation, query)271        # The claim's age should be 0 at this moment. But in some unexpected272        # case, such as slow test, the age maybe larger than 0. Just skip273        # asserting if so.274        if resp['body']['age'] == 0:275            self.assertGreater(estimated_age, resp['body']['age'])276        # Delete the claim277        action = consts.CLAIM_DELETE278        body = {"queue_name": "skittle",279                "claim_id": claim_id}280        req = test_utils.create_request(action, body, self.headers)281        self.protocol.onMessage(req, False)282        resp = json.loads(send_mock.call_args[0][0].decode())283        self.assertEqual(204, resp['headers']['status'])284        # Try to delete a message with an invalid claim ID285        action = consts.MESSAGE_DELETE286        body = {"queue_name": "skittle",287                "message_id": message_id_3,288                "claim_id": claim_id}289        req = test_utils.create_request(action, body, self.headers)290        self.protocol.onMessage(req, False)291        resp = json.loads(send_mock.call_args[0][0].decode())292        self.assertEqual(400, resp['headers']['status'])293        # Make sure it wasn't deleted!294        action = consts.MESSAGE_GET295        body = {"queue_name": "skittle",296                "message_id": message_id_2}297        req = test_utils.create_request(action, body, self.headers)298        self.protocol.onMessage(req, False)299        resp = json.loads(send_mock.call_args[0][0].decode())300        self.assertEqual(200, resp['headers']['status'])301        # Try to get a claim that doesn't exist302        action = consts.CLAIM_GET303        body = {"queue_name": "skittle",304                "claim_id": claim_id}305        req = test_utils.create_request(action, body, self.headers)306        self.protocol.onMessage(req, False)307        resp = json.loads(send_mock.call_args[0][0].decode())308        self.assertEqual(404,  resp['headers']['status'])309        # Try to update a claim that doesn't exist310        action = consts.CLAIM_UPDATE311        body = {"queue_name": "skittle",312                "ttl": 60,313                "grace": 60,314                "claim_id": claim_id}315        req = test_utils.create_request(action, body, self.headers)316        self.protocol.onMessage(req, False)317        resp = json.loads(send_mock.call_args[0][0].decode())318        self.assertEqual(404,  resp['headers']['status'])319    def test_post_claim_nonexistent_queue(self):320        action = consts.CLAIM_CREATE321        body = {"queue_name": "nonexistent",322                "ttl": 100,323                "grace": 60}324        send_mock = mock.Mock()325        self.protocol.sendMessage = send_mock326        req = test_utils.create_request(action, body, self.headers)327        self.protocol.onMessage(req, False)328        resp = json.loads(send_mock.call_args[0][0].decode())329        self.assertEqual(204, resp['headers']['status'])330    def test_get_claim_nonexistent_queue(self):331        action = consts.CLAIM_GET332        body = {"queue_name": "nonexistent",333                "claim_id": "aaabbbba"}334        send_mock = mock.Mock()335        self.protocol.sendMessage = send_mock336        req = test_utils.create_request(action, body, self.headers)337        self.protocol.onMessage(req, False)338        resp = json.loads(send_mock.call_args[0][0].decode())339        self.assertEqual(404,  resp['headers']['status'])340    def _get_a_claim(self):341        action = consts.CLAIM_CREATE342        body = {"queue_name": "skittle",343                "ttl": 100,344                "grace": 60}345        send_mock = mock.Mock()346        self.protocol.sendMessage = send_mock347        req = test_utils.create_request(action, body, self.headers)348        self.protocol.onMessage(req, False)349        resp = json.loads(send_mock.call_args[0][0].decode())350        self.assertEqual(201, resp['headers']['status'])...store.py
Source:store.py  
...46                self.position.linear._onresponse(content['result'])47            # wallet48            elif resp.request.path_url.startswith('/v2/private/wallet/balance'):49                self.wallet._onresponse(content['result'])50    def onmessage(self, msg: str, ws: WebSocket) -> None:51        content: Dict[str, Any] = json.loads(msg)52        if 'topic' in content:53            topic: str = content['topic']54            data: Union[List[Item], Item] = content['data']55            type_: Optional[str] = content.get('type')56            if any([57                topic.startswith('orderBookL2_25'),58                topic.startswith('orderBook_200'),59            ]):60                self.orderbook._onmessage(type_, data)61            elif topic.startswith('trade'):62                self.trade._onmessage(data)63            elif topic.startswith('insurance'):64                self.insurance._onmessage(data)65            elif topic.startswith('instrument_info'):66                self.instrument._onmessage(type_, data)67            if any([68                topic.startswith('klineV2'),69                topic.startswith('candle'),70            ]):71                self.kline._onmessage(topic, data)72            elif topic == 'position':73                self.position._onmessage(data)74                self.wallet._onposition(data)75            elif topic == 'execution':76                self.execution._onmessage(data)77            elif topic == 'order':78                self.order._onmessage(data)79            elif topic == 'stop_order':80                self.stoporder._onmessage(data)81            elif topic == 'wallet':82                self.wallet._onmessage(data)83            for event in self._events:84                event.set()85            self._events.clear()86    def wait(self) -> None:87        event = Event()88        self._events.append(event)89        event.wait()90class DefaultDataStore(DataStore): ...91Item = Dict[str, Any]92class _KeyValueStore:93    _KEYS: List[str]94    _MAXLEN: Optional[int]95    def __init__(self) -> None:96        self._data: Dict[str, Item] = {}97        self._events: List[Event] = []98    99    def get(self, **kwargs) -> Optional[Item]:100        try:101            dumps = self._dumps(kwargs)102            if dumps in self._data:103                return self._data[dumps]104        except KeyError:105            if kwargs:106                for item in self._data.values():107                    for k, v, in kwargs.items():108                        if not k in item:109                            break110                        if v != item[k]:111                            break112                    else:113                        return item114            else:115                for item in self._data.values():116                    return item117    def getlist(self, **kwargs) -> List[Item]:118        if kwargs:119            result = []120            for item in self._data.values():121                for k, v in kwargs.items():122                    if not k in item:123                        break124                    if v != item[k]:125                        break126                else:127                    result.append(item)128            return result129        else:130            return list(self._data.values())131    def __len__(self):132        return len(self._data)133    def _dumps(self, item: Item) -> str:134        keyitem = {k: item[k] for k in self._KEYS}135        return urllib.parse.urlencode(keyitem)136    137    def _update(self, items: List[Item]) -> None:138        for item in items:139            try:140                key = self._dumps(item)141                if key in self._data:142                    self._data[key].update(item)143                else:144                    self._data[key] = item145            except KeyError:146                pass147        if self._MAXLEN is not None:148            len_data = len(self._data)149            if len_data > self._MAXLEN:150                over = len_data - self._MAXLEN151                keys = []152                for i, k in enumerate(self._data.keys()):153                    if i < over:154                        keys.append(k)155                    else:156                        break157                for k in keys:158                    self._data.pop(k)159        for event in self._events:160            event.set()161        self._events.clear()162    def _pop(self, items: List[Item]) -> None:163        for item in items:164            try:165                key = self._dumps(item)166                if key in self._data:167                    self._data.pop(key)168            except KeyError:169                pass170        for event in self._events:171            event.set()172        self._events.clear()173    def wait(self) -> None:174        event = Event()175        self._events.append(event)176        event.wait()177class OrderBook(_KeyValueStore):178    _KEYS = ['symbol', 'id', 'side']179    _MAXLEN = None180    def getbest(self, symbol: str) -> Dict[str, Optional[Item]]:181        result = {'Sell': {}, 'Buy': {}}182        for item in self._data.values():183            if item['symbol'] == symbol:184                result[item['side']][float(item['price'])] = item185        return {186            'Sell': result['Sell'][min(result['Sell'])] if result['Sell'] else None,187            'Buy': result['Buy'][max(result['Buy'])] if result['Buy'] else None188        }189    def getsorted(self, symbol: str) -> Dict[str, List[Item]]:190        result = {'Sell': [], 'Buy': []}191        for item in self._data.values():192            if item['symbol'] == symbol:193                result[item['side']].append(item)194        return {195            'Sell': sorted(result['Sell'], key=lambda x: float(x['price'])),196            'Buy': sorted(result['Buy'], key=lambda x: float(x['price']), reverse=True)197        }198    def _onmessage(self, type_: str, data: Union[List[Item], Item]) -> None:199        if type_ == 'snapshot':200            if isinstance(data, dict):201                data = data['order_book']202            self._update(data)203        elif type_ == 'delta':204            self._pop(data['delete'])205            self._update(data['update'])206            self._update(data['insert'])207class Trade(_KeyValueStore):208    _KEYS = ['trade_id']209    _MAXLEN = 10000210    def _onmessage(self, data: List[Item]) -> None:211        self._update(data)212class Insurance(_KeyValueStore):213    _KEYS = ['currency']214    _MAXLEN = None215    def _onmessage(self, data: List[Item]) -> None:216        self._update(data)217class Instrument(_KeyValueStore):218    _KEYS = ['symbol']219    _MAXLEN = None220    def _onmessage(self, type_: str, data: Item) -> None:221        if type_ == 'snapshot':222            self._update([data])223        elif type_ == 'delta':224            self._update(data['update'])225class Kline(_KeyValueStore):226    _KEYS = ['symbol', 'start']227    _MAXLEN = 5000228    def _onmessage(self, topic: str, data: List[Item]) -> None:229        symbol = topic.split('.')[2] # ex:'klineV2.1.BTCUSD'230        for item in data:231            item['symbol'] = symbol232        self._update(data)233class Position:234    def __init__(self):235        self.inverse = PositionInverse()236        self.linear = PositionLinear()237    238    def _onmessage(self, data: List[Item]) -> None:239        if len(data):240            symbol: str = data[0]['symbol']241            if symbol.endswith('USDT'):242                self.linear._onmessage(data)243            else:244                self.inverse._onmessage(data)245class PositionInverse(_KeyValueStore):246    _KEYS = ['symbol', 'position_idx']247    _MAXLEN = None248    249    def getone(self, symbol: str) -> Optional[Item]:250        return self.get(symbol=symbol, position_idx=0)251    def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:252        return {253            'Sell': self.get(symbol=symbol, position_idx=2),254            'Buy': self.get(symbol=symbol, position_idx=1),255        }256    def _onresponse(self, data: Union[Item, List[Item]]) -> None:257        if isinstance(data, dict):258            self._update([data])259        elif isinstance(data, list):260            if len(data) and 'data' in data[0]:261                self._update([item['data'] for item in data])262            else:263                self._update(data)264    def _onmessage(self, data: List[Item]) -> None:265        self._update(data)266class PositionLinear(_KeyValueStore):267    _KEYS = ['symbol', 'side']268    _MAXLEN = None269    def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:270        return {271            'Sell': self.get(symbol=symbol, side='Sell'),272            'Buy': self.get(symbol=symbol, side='Buy'),273        }274    def _onresponse(self, data: List[Item]) -> None:275        if len(data) and 'data' in data[0]:276            self._update([item['data'] for item in data])277        else:278            self._update(data)279    def _onmessage(self, data: List[Item]) -> None:280        self._update(data)281class Execution(_KeyValueStore):282    _KEYS = ['exec_id']283    _MAXLEN = 5000284    def _onmessage(self, data: List[Item]) -> None:285        self._update(data)286class Order(_KeyValueStore):287    _KEYS = ['order_id']288    _MAXLEN = None289    def _onresponse(self, data: List[Item]) -> None:290        self._update(data)291    def _onmessage(self, data: List[Item]) -> None:292        for item in data:293            if item['order_status'] in ('Created', 'New', 'PartiallyFilled', ):294                self._update([item])295            else:296                self._pop([item])297class StopOrder(_KeyValueStore):298    _KEYS = ['stop_order_id']299    _MAXLEN = None300    def _onresponse(self, data: List[Item]) -> None:301        self._update(data)302    def _onmessage(self, data: List[Item]) -> None:303        for item in data:304            if 'order_id' in item:305                item['stop_order_id'] = item.pop('order_id')306            if 'order_status' in item:307                item['stop_order_status'] = item.pop('order_status')308            if item['stop_order_status'] in ('Active', 'Untriggered', ):309                self._update([item])310            else:311                self._pop([item])312class Wallet(_KeyValueStore):313    _KEYS = ['coin']314    _MAXLEN = None315    def _onresponse(self, data: Dict[str, Item]) -> None:316        for coin, item in data.items():317            _item = {}318            _item['coin'] = coin319            _item['wallet_balance'] = item['wallet_balance']320            _item['available_balance'] = item['available_balance']321            self._update([_item])322    def _onposition(self, data: List[Item]) -> None:323        if len(data) and 'position_idx' in data[0]:324            for item in data:325                _item = {}326                symbol: str = item['symbol']327                if symbol.endswith('USD'):328                    _item['coin'] = symbol[:-3] # ex:'BTCUSD'329                else:330                    _item['coin'] = symbol[:-6] # ex:'BTCUSDM21'331                _item['wallet_balance'] = item['wallet_balance']332                _item['available_balance'] = item['available_balance']333                self._update([_item])334    def _onmessage(self, data: List[Item]) -> None:335        for item in data:336            _item = {}337            _item['coin'] = 'USDT'338            _item['wallet_balance'] = item['wallet_balance']339            _item['available_balance'] = item['available_balance']...binance.py
Source:binance.py  
...56                self.order._onresponse(symbol, data)57            elif resp.url.path in ("/fapi/v1/listenKey",):58                self.listenkey = data["listenKey"]59                asyncio.create_task(self._listenkey(resp.__dict__["_raw_session"]))60    def _onmessage(self, msg: Any, ws: ClientWebSocketResponse) -> None:61        if "error" in msg:62            logger.warning(msg)63        if "result" not in msg:64            data = msg["data"] if "data" in msg else msg65            event = data["e"] if isinstance(data, dict) else data[0]["e"]66            if event in ("trade", "aggTrade"):67                self.trade._onmessage(data)68            elif event == "markPriceUpdate":69                self.markprice._onmessage(data)70            elif event == "kline":71                self.kline._onmessage(data)72            elif event == "continuous_kline":73                self.continuouskline._onmessage(data)74            elif event in ("24hrMiniTicker", "24hrTicker"):75                self.ticker._onmessage(data)76            elif event == "bookTicker":77                self.bookticker._onmessage(data)78            elif event == "forceOrder":79                self.liquidation._onmessage(data)80            elif event == "depthUpdate":81                self.orderbook._onmessage(data)82            elif event == "ACCOUNT_UPDATE":83                self.balance._onmessage(data)84                self.position._onmessage(data)85            elif event == "ORDER_TRADE_UPDATE":86                self.order._onmessage(data)87    @staticmethod88    async def _listenkey(session: aiohttp.ClientSession):89        while not session.closed:90            await session.put("https://fapi.binance.com/fapi/v1/listenKey", auth=Auth)91            await asyncio.sleep(1800.0)  # 30 minutes92    @property93    def trade(self) -> "Trade":94        return self.get("trade", Trade)95    @property96    def markprice(self) -> "MarkPrice":97        return self.get("markprice", MarkPrice)98    @property99    def kline(self) -> "Kline":100        return self.get("kline", Kline)101    @property102    def continuouskline(self) -> "ContinuousKline":103        return self.get("continuouskline", ContinuousKline)104    @property105    def ticker(self) -> "Ticker":106        return self.get("ticker", Ticker)107    @property108    def bookticker(self) -> "BookTicker":109        return self.get("bookticker", BookTicker)110    @property111    def liquidation(self) -> "Liquidation":112        return self.get("liquidation", Liquidation)113    @property114    def orderbook(self) -> "OrderBook":115        return self.get("orderbook", OrderBook)116    @property117    def balance(self) -> "Balance":118        return self.get("balance", Balance)119    @property120    def position(self) -> "Position":121        return self.get("position", Position)122    @property123    def order(self) -> "Order":124        """125        ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)126        """127        return self.get("order", Order)128class Trade(DataStore):129    _MAXLEN = 99999130    def _onmessage(self, item: Item) -> None:131        self._insert([item])132class MarkPrice(DataStore):133    _KEYS = ["s"]134    def _onmessage(self, data: Union[Item, list[Item]]) -> None:135        if isinstance(data, list):136            self._update(data)137        else:138            self._update([data])139class Kline(DataStore):140    _KEYS = ["t", "s", "i"]141    def _onmessage(self, item: Item) -> None:142        self._update([item["k"]])143class ContinuousKline(DataStore):144    _KEYS = ["ps", "ct", "t", "i"]145    def _onmessage(self, item: Item) -> None:146        self._update([{"ps": item["ps"], "ct": item["ct"], **item["k"]}])147class Ticker(DataStore):148    _KEYS = ["s"]149    def _onmessage(self, data: Union[Item, list[Item]]) -> None:150        if isinstance(data, list):151            self._update(data)152        else:153            self._update([data])154class BookTicker(DataStore):155    _KEYS = ["s"]156    def _onmessage(self, item: Item) -> None:157        self._update([item])158class Liquidation(DataStore):159    def _onmessage(self, item: Item) -> None:160        self._insert([item["o"]])161class OrderBook(DataStore):162    _KEYS = ["s", "S", "p"]163    _MAPSIDE = {"BUY": "b", "SELL": "a"}164    def _init(self) -> None:165        self.initialized = False166        self._buff = deque(maxlen=200)167    def sorted(self, query: Optional[Item] = None) -> dict[str, list[float]]:168        if query is None:169            query = {}170        result = {self._MAPSIDE[k]: [] for k in self._MAPSIDE}171        for item in self:172            if all(k in item and query[k] == item[k] for k in query):173                result[self._MAPSIDE[item["S"]]].append([item["p"], item["q"]])174        result["b"].sort(key=lambda x: float(x[0]), reverse=True)175        result["a"].sort(key=lambda x: float(x[0]))176        return result177    def _onmessage(self, item: Item) -> None:178        if not self.initialized:179            self._buff.append(item)180        for s, bs in self._MAPSIDE.items():181            for row in item[bs]:182                if float(row[1]) != 0.0:183                    self._update([{"s": item["s"], "S": s, "p": row[0], "q": row[1]}])184                else:185                    self._delete([{"s": item["s"], "S": s, "p": row[0]}])186    def _onresponse(self, symbol: str, item: Item) -> None:187        self.initialized = True188        self._delete(self.find({"s": symbol}))189        for s, bs in (("BUY", "bids"), ("SELL", "asks")):190            for row in item[bs]:191                self._insert([{"s": symbol, "S": s, "p": row[0], "q": row[1]}])192        for msg in self._buff:193            if msg["U"] <= item["lastUpdateId"] and msg["u"] >= item["lastUpdateId"]:194                self._onmessage(msg)195        self._buff.clear()196class Balance(DataStore):197    _KEYS = ["a"]198    def _onmessage(self, item: Item) -> None:199        self._update(item["a"]["B"])200    def _onresponse(self, data: list[Item]) -> None:201        for item in data:202            self._update(203                [204                    {205                        "a": item["asset"],206                        "wb": item["balance"],207                        "cw": item["crossWalletBalance"],208                    }209                ]210            )211class Position(DataStore):212    _KEYS = ["s", "ps"]213    def _onmessage(self, item: Item) -> None:214        self._update(item["a"]["P"])215    def _onresponse(self, data: list[Item]) -> None:216        for item in data:217            self._update(218                [219                    {220                        "s": item["symbol"],221                        "pa": item["positionAmt"],222                        "ep": item["entryPrice"],223                        "mt": item["marginType"],224                        "iw": item["isolatedWallet"],225                        "ps": item["positionSide"],226                    }227                ]228            )229class Order(DataStore):230    _KEYS = ["s", "i"]231    def _onmessage(self, item: Item) -> None:232        if item["o"]["X"] not in ("FILLED", "CANCELED", "EXPIRED"):233            self._update([item["o"]])234        else:235            self._delete([item["o"]])236    def _onresponse(self, symbol: Optional[str], data: list[Item]) -> None:237        if symbol is not None:238            self._delete(self.find({"symbol": symbol}))239        else:240            self._clear()241        for item in data:242            self._insert(243                [244                    {245                        "s": item["symbol"],...vngateio_ws.py
Source:vngateio_ws.py  
1# encoding: utf-82import json3from time import time, sleep4from threading import Thread5from datetime import datetime6import base647import hmac8import hashlib9import json10import gzip, binascii, os  11import urllib , requests12import websocket    13import time14from vnpy.trader.vtFunction import systemSymbolToVnSymbol , VnSymbolToSystemSymbol15import json16GATEIO_SOCKET_URL = "wss://ws.gate.io/v3"17'''18'''19class Gate_WSDataApi(object):20    """åºäºWebsocketçAPI对象"""21    #----------------------------------------------------------------------22    def __init__(self):23        """Constructor"""24        self.host = ''          # æå¡å¨å°å25        self.apiKey = ''        # ç¨æ·å26        self.secretKey = ''     # å¯ç 27        28        self.ws = None          # websocketåºç¨å¯¹è±¡29        self.thread = None      # å·¥ä½çº¿ç¨30        self.subscribeStrList = set([])31    #----------------------------------------------------------------------32    def reconnect(self):33        """éæ°è¿æ¥"""34        # é¦å
å
³éä¹åçè¿æ¥35        #self.close()36        37        # åæ§è¡éè¿ä»»å¡38        self.ws = websocket.WebSocketApp(self.host, 39                                         on_message=self.onMessage,40                                         on_error=self.onError,41                                         on_close=self.onClose,42                                         on_open=self.onOpen)        43    44        self.thread = Thread(target=self.ws.run_forever , args = (None , None , 60, 30))45        self.thread.start()46    47    #----------------------------------------------------------------------48    def connect_Subpot(self, apiKey , secretKey , trace = False):49        self.host = GATEIO_SOCKET_URL50        self.apiKey = apiKey51        self.secretKey = secretKey52        self.trace = trace53        websocket.enableTrace(trace)54        self.ws = websocket.WebSocketApp(self.host, 55                                             on_message=self.onMessage,56                                             on_error=self.onError,57                                             on_close=self.onClose,58                                             on_open=self.onOpen)        59            60        self.thread = Thread(target = self.ws.run_forever , args = (None , None , 60, 30))61        # self.thread_heart = Thread(target = self.run_forever_heart)62        self.thread.start()63        # self.thread_heart.start()64    #----------------------------------------------------------------------65    def onMessage(self, ws, evt):66        """67        ä¿¡æ¯æ¨éäºä»¶68        :param ws:  æ¥å£69        :param evt: äºä»¶70        :return:71        """72        print(u'vngate_nwe.onMessage:{}'.format(evt))73        74    #----------------------------------------------------------------------75    def onError(self, ws, evt):76        """77        æ¥å£é误æ¨éäºä»¶78        :param ws:79        :param evt:80        :return:81        """82        print(u'vngate_nwe.onApiError:{}'.format(evt))83        84    #----------------------------------------------------------------------85    def onClose(self, ws):86        """87        æ¥å£æå¼äºä»¶88        :param ws:89        :return:90        """91        print(u'vngate_nwe.onClose')92        93    #----------------------------------------------------------------------94    def onOpen(self, ws):95        """96        æ¥å£æå¼äºä»¶97        :param ws:98        :return:99        """100        print(u'vngate_nwe.onOpen')101    #----------------------------------------------------------------------102    def sendSocketCmd( self, client_id , method , json_params = []):103        send_json = {104            "id": client_id,105            "method": method,106            "params": json_params107        }108        self.ws.send( json.dumps(send_json))109    '''110    vngate_nwe.onMessage:{"error": null, "result": {"period": 86400, "open": "18.1604", "close": "17.03"111, "high": "18.53", "low": "16.54", "last": "17.03", "change": "-6.22", "quoteVolume": "1015826.9981111289865", "baseVolume": "17910280.42194529534205249261"}, "id": 1}113    '''114    #----------------------------------------------------------------------115    def querySpotTicker(self , u_id , symbol_pair = "EOS_USDT" , time_period = 86400):116        symbol_pair = systemSymbolToVnSymbol(symbol_pair)117        self.sendSocketCmd( u_id , "ticker.query" , [ symbol_pair , time_period])118    '''119    vngate_nwe.onMessage:{120    "error": null,121    "result": {122        "asks": [123            [124                "16.9507",125                "293.6242489299"126            ],127            [128                "16.9586",129                "1591.5376104592"130            ],131        ],132        "bids": [133            [134                "16.95",135                "0.1094"136            ],137            [138                "16.9471",139                "163.602"140            ],141            [142                "16.9431",143                "1.4607"144            ],145        ]146    },147    "id": 1148}149    '''150    #----------------------------------------------------------------------151    def querySpotDepth(self , u_id , symbol_pair = "EOS_USDT" , limit = 5 , interval = "0.00000001"):152        symbol_pair = systemSymbolToVnSymbol(symbol_pair)153        self.sendSocketCmd( u_id , "depth.query" , [ symbol_pair , limit , interval])154    '''155    {156    "error": null,157    "result": [158        {159            "id": 7177814,160            "time": 1523887673.562782,161            "price": "6.05",162            "amount": "20",163            "type": "buy"164        },165        {166            "id": 7177813,167            "time": 1523887354.256974,168            "price": "6.05",169            "amount": "15",170            "type": "buy"171        },172    ],173    "id": 12309174}175    '''176    #----------------------------------------------------------------------177    def querySpotTrades(self, u_id , symbol_pair = "EOS_USDT" , limit = 2 , last_id = 7177813):178        symbol_pair = systemSymbolToVnSymbol(symbol_pair)179        self.sendSocketCmd( u_id , "trades.query" , [ symbol_pair , limit , last_id])180    '''181    {182    "error": null, 183    "result": [184        [185            1492358400, time186            "7000.00",  open187            "8000.0",   close188            "8100.00",  highest189            "6800.00",  lowest190            "1000.00"   volume191            "123456.00" amount192            "BTC_USDT"  market name193        ]194        ...195    ]196    "id": 12312197    }198    '''199    #----------------------------------------------------------------------200    def querySpotKline(self, u_id , symbol_pair = "BTC_USDT", start = 1516951219 , end_time = 1516951219 , interval = 1800):201        symbol_pair = systemSymbolToVnSymbol(symbol_pair)202        self.sendSocketCmd( u_id , "kline.query" , [ symbol_pair , start , end_time, interval])203    '''204    vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 2}205    vngate_nwe.onMessage:{"method": "ticker.update", "params": ["BOT_USDT", {"period": 86400, "open": "0206    .7383", "close": "0.9048", "high": "1.015", "low": "0.715", "last": "0.9048", "change": "22.55", "qu207    oteVolume": "4565863.1552367147", "baseVolume": "4071168.7349472209511"}], "id": null}208    vngate_nwe.onMessage:{"method": "ticker.update", "params": ["BOT_USDT", {"period": 86400, "open": "0209    .7383", "close": "0.9049", "high": "1.015", "low": "0.715", "last": "0.9049", "change": "22.56", "qu210    oteVolume": "4571805.6819467147", "baseVolume": "4076546.0501166889511"}], "id": null}211    '''212    #----------------------------------------------------------------------213    def subscribeSpotTicker(self, u_id , symbol_pair = "BOT_USDT"):214        symbol_pair = systemSymbolToVnSymbol(symbol_pair)215        self.sendSocketCmd( u_id , "ticker.subscribe" , [ symbol_pair ])216    '''217    vngate_nwe.onMessage:{"method": "depth.update", "params": [true, {"asks": [["0.893", "813.385"], ["0218.8931", "102.65936009"], ["0.8932", "288.8898"], ["0.9058", "2028"], ["0.9067", "10"], ["0.9076", "421987.11"], ["0.9084", "1000"], ["0.9085", "17.49966971"], ["0.9086", "49.468551235"], ["0.9087", "1950220.59"]], "bids": [["0.8929", "88.76"], ["0.8921", "198.01888"], ["0.892", "256.09"], ["0.8919", "3280221.5348"], ["0.8803", "1382"], ["0.8802", "2257.925"], ["0.8801", "16.58862017"], ["0.88", "300"], ["0222.8779", "822.56"], ["0.8669", "774.0223"]]}, "BOT_USDT"], "id": null}223vngate_nwe.onMessage:{"method": "depth.update", "params": [false, {"bids": [["0.8929", "110.0925"]]}224, "BOT_USDT"], "id": null}225    Can only subscribe one market at the same time, market list is not supported currently. For multiple subscriptions, only the last one takes effect.226    '''227    #----------------------------------------------------------------------228    def subscribeSpotDepth(self, u_id , symbol_pair = "BOT_USDT" , limit = 30, interval = "0.00000001"):229        symbol_pair = systemSymbolToVnSymbol(symbol_pair)230        self.sendSocketCmd( u_id , "depth.subscribe" , [ symbol_pair , limit , interval])231    '''232    vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 3}233vngate_nwe.onMessage:{"method": "trades.update", "params": ["BOT_USDT", [{"id": 56675623, "time": 1523425592829.2169299, "price": "0.9096", "amount": "310.3478", "type": "sell"}, {"id": 56675622, "time":235 1525592829.2167261, "price": "0.9096", "amount": "461", "type": "sell"}, {"id": 56667395236, "time": 1525591676.7347641, "price": "0.9085", "amount": "847.41", "type": "sell"}]], "id": null}237    '''238    #----------------------------------------------------------------------239    def subscribeSpotTrades(self, u_id , symbol_pair = "BOT_USDT"  ):240        symbol_pair = systemSymbolToVnSymbol(symbol_pair)241        self.sendSocketCmd( u_id , "trades.subscribe" , [ symbol_pair ])242    '''243    vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 3}244vngate_nwe.onMessage:{"method": "kline.update", "params": [[1525591800, "0.9085", "0.9174", "0.9217"245, "0.9049", "78364.430712655", "71495.1179278982815", "BOT_USDT"]], "id": null}246    '''247    #----------------------------------------------------------------------248    def subscribeSpotKline(self, u_id , symbol_pair = "BOT_USDT" , interval = 1800):249        symbol_pair = systemSymbolToVnSymbol(symbol_pair)...phemex.py
Source:phemex.py  
...33                "symbol"34            ][0]35            data = await resp.json()36            if resp.url.path in ("/exchange/public/md/kline",):37                self.kline._onmessage({"symbol": symbol, "kline": data["data"]["rows"]})38    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:39        if not msg.get("id"):40            if "trades" in msg:41                self.trade._onmessage(msg)42            elif "book" in msg:43                self.orderbook._onmessage(msg)44            elif "tick" in msg:45                self.ticker._onmessage(msg)46            elif "market24h" in msg:47                self.market24h._onmessage(msg["market24h"])48            elif "kline" in msg:49                self.kline._onmessage(msg)50            if "accounts" in msg:51                self.accounts._onmessage(msg.get("accounts"))52            if "orders" in msg:53                self.orders._onmessage(msg.get("orders"))54            if "positions" in msg:55                self.positions._onmessage(msg.get("positions"))56        if msg.get("error"):57            logger.warning(msg)58    @property59    def trade(self) -> "Trade":60        return self.get("trade", Trade)61    @property62    def orderbook(self) -> "OrderBook":63        return self.get("orderbook", OrderBook)64    @property65    def ticker(self):66        return self.get("ticker", Ticker)67    @property68    def market24h(self) -> "Market24h":69        return self.get("market24h", Market24h)70    @property71    def kline(self) -> "Kline":72        return self.get("kline", Kline)73    @property74    def accounts(self) -> "Accounts":75        return self.get("accounts", Accounts)76    @property77    def orders(self) -> "Orders":78        return self.get("orders", Orders)79    @property80    def positions(self) -> "Positions":81        return self.get("positions", Positions)82class Trade(DataStore):83    _KEYS = ["symbol", "timestamp"]84    _MAXLEN = 9999985    def _onmessage(self, message: Item) -> None:86        symbol = message.get("symbol")87        self._insert(88            [89                {90                    "symbol": symbol,91                    "timestamp": item[0],92                    "side": item[1],93                    "price": item[2] / 10000,94                    "size": item[3],95                }96                for item in message.get("trades", [])97            ]98        )99class OrderBook(DataStore):100    _KEYS = ["symbol", "side", "price"]101    def _init(self) -> None:102        self.timestamp: Optional[int] = None103    def sorted(self, query: Item = None) -> dict[str, list[Item]]:104        if query is None:105            query = {}106        result = {"SELL": [], "BUY": []}107        for item in self:108            if all(k in item and query[k] == item[k] for k in query):109                result[item["side"]].append(item)110        result["SELL"].sort(key=lambda x: x["price"])111        result["BUY"].sort(key=lambda x: x["price"], reverse=True)112        return result113    def _onmessage(self, message: Item) -> None:114        symbol = message["symbol"]115        book = message["book"]116        for key, side in (("bids", "BUY"), ("asks", "SELL")):117            for item in book[key]:118                if item[1] != 0:119                    self._insert(120                        [121                            {122                                "symbol": symbol,123                                "side": side,124                                "price": item[0],125                                "size": item[1],126                            }127                        ]128                    )129                else:130                    self._delete(131                        [132                            {133                                "symbol": symbol,134                                "side": side,135                                "price": item[0],136                                "size": item[1],137                            }138                        ]139                    )140        self.timestamp = message["timestamp"]141class Ticker(DataStore):142    _KEYS = ["symbol"]143    def _onmessage(self, message):144        self._update([message.get("tick")])145class Market24h(DataStore):146    _KEYS = ["symbol"]147    def _onmessage(self, item: Item) -> None:148        self._update([item])149class Kline(DataStore):150    _KEYS = ["symbol", "interval", "timestamp"]151    def _onmessage(self, message: Item) -> None:152        symbol = message.get("symbol")153        self._insert(154            [155                {156                    "symbol": symbol,157                    "interval": item[1],158                    "timestamp": item[0],159                    "open": item[3] / 10000,160                    "high": item[4] / 10000,161                    "low": item[5] / 10000,162                    "close": item[6] / 10000,163                    "volume": item[7],164                    "turnover": item[8] / 10000,165                }166                for item in message.get("kline", [])167            ]168        )169class Accounts(DataStore):170    _KEYS = ["accountID", "currency"]171    def _onmessage(self, data: list[Item]) -> None:172        self._update(data)173class Orders(DataStore):174    _KEYS = ["orderID"]175    def _onmessage(self, data: list[Item]) -> None:176        for item in data:177            if item["ordStatus"] == "New":178                self._insert([item])179            elif item["ordStatus"] == "PartiallyFilled":180                self._update([item])181            elif item["ordStatus"] == "Filled":182                self._delete([item])183            elif item["ordStatus"] == "Canceled" and item["action"] != "Replace":184                self._delete([item])185class Positions(DataStore):186    _KEYS = ["accountID", "symbol"]187    def _onmessage(self, data: list[Item]) -> None:188        for item in data:189            if item["size"] == 0:190                self._delete([item])191            else:...Using AI Code Generation
1onmessage = function(e) {2  console.log('Message received from main script');3  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4  console.log('Posting message back to main script');5  postMessage(workerResult);6}7      if (window.Worker) {8        var myWorker = new Worker("test.js");9        myWorker.onmessage = function(e) {10          document.getElementById("result").innerHTML = e.data;11        };12        myWorker.postMessage([1, 2]);13      } else {14        document.getElementById("result").innerHTML = "Sorry, your browser does not support Web Workers...";15      }Using AI Code Generation
1onmessage = function(e) {2  console.log('Message received from main script');3  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4  console.log('Posting message back to main script');5  postMessage(workerResult);6}7var wpt = new Worker("test.js");8wpt.onmessage = function(event) {9  document.getElementById("result").innerHTML = event.data;10};11wpt.postMessage([10, 20]);12onmessage = function(e) {13  console.log('Message received from main script');14  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);15  console.log('Posting message back to main script');16  postMessage(workerResult);17}18var wpt = new Worker("test.js");19wpt.onmessage = function(event) {20  document.getElementById("result").innerHTML = event.data;21};22wpt.postMessage([10, 20]);23onmessage = function(e) {24  console.log('Message received from main script');25  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);26  console.log('Posting message back to main script');27  postMessage(workerResult);28}29var wpt = new Worker("test.js");30wpt.onmessage = function(event) {31  document.getElementById("result").innerHTML = event.data;32};33wpt.postMessage([10, 20]);34onmessage = function(e) {35  console.log('Message received from main script');36  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);37  console.log('Posting message back to main script');38  postMessage(workerResult);39}Using AI Code Generation
1onmessage = function(e) {2  console.log('Worker: Message received from main script');3  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4  console.log('Worker: Posting message back to main script');5  postMessage(workerResult);6}7self.addEventListener('message', function(e) {8  console.log('Worker: Message received from main script');9  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);10  console.log('Worker: Posting message back to main script');11  postMessage(workerResult);12}, false);13self.addEventListener('message', function(e) {14  console.log('Worker: Message received from main script');15  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);16  console.log('Worker: Posting message back to main script');17  postMessage(workerResult);18}, false);19self.addEventListener('message', function(e) {20  console.log('Worker: Message received from main script');21  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);22  console.log('Worker: Posting message back to main script');23  postMessage(workerResult);24}, false);25self.addEventListener('message', function(e) {26  console.log('Worker: Message received from main script');27  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);28  console.log('Worker: Posting message back to main script');29  postMessage(workerResult);30}, false);31self.addEventListener('message', function(e) {32  console.log('Worker: Message received from main script');33  var workerResult = 'Result: ' + (e.data[0] * e.data[1]);34  console.log('Worker: Posting message back to main script');35  postMessage(workerResult);36}, false);37self.addEventListener('message', function(e) {Using AI Code Generation
1var wpt = new Worker('worker.js');2wpt.onmessage = function (e) {3    console.log(e.data);4};5wpt.postMessage('Hello from test.js');6onmessage = function (e) {7    console.log(e.data);8    postMessage('Hello from worker.js');9};10var wpt = new Worker('worker.js');11wpt.addEventListener('message', function (e) {12    console.log(e.data);13});14wpt.postMessage('Hello from test.js');15self.addEventListener('message', function (e) {16    console.log(e.data);17    postMessage('Hello from worker.js');18});19var wpt = new Worker('worker.js');20wpt.postMessage('Hello from test.js');21self.addEventListener('message', function (e) {22    console.log(e.data);23    postMessage('Hello from worker.js');24});25var wpt = new Worker('worker.js');26wpt.addEventListener('message', function (e) {27    console.log(e.data);28});29wpt.postMessage('Hello from test.js');30self.addEventListener('message', function (e) {31    console.log(e.data);32    postMessage('Hello from worker.js');33});34var wpt = new Worker('worker.js');35wpt.postMessage('Hello from test.js');36self.addEventListener('message', function (e) {37    console.log(e.data);38    postMessage('Hello from worker.js');39});40var wpt = new Worker('worker.js');41wpt.addEventListener('message', function (e) {42    console.log(e.data);43});44wpt.postMessage('Hello from test.js');Using AI Code Generation
1onmessage = function(event) {2  var data = event.data;3  var result = data;4  postMessage(result);5}6var worker = new Worker('test.js');7worker.postMessage('test');8worker.onmessage = function(event) {9  console.log(event.data);10}11onmessage = function(event) {12  var data = event.data;13  var result = data;14  for (var i = 0; i < 100; i++) {15    for (var j = 0; j < 100; j++) {16      for (var k = 0; k < 100; k++) {17        var result = i + j + k;18      }19    }20  }21  postMessage(result);22}23var worker = new Worker('test.js');24worker.postMessage('test');25worker.onmessage = function(event) {26  console.log(event.data);27}Using AI Code Generation
1onmessage = function(e) {2  postMessage("done");3}4function doWork() {5  var worker = new Worker("test.js");6  worker.onmessage = function(e) {7  }8  worker.postMessage("start");9}10onmessage = function(e) {11  postMessage("done");12}13function doWork() {14  var worker = new Worker("test.js");15  worker.onmessage = function(e) {16  }17  worker.postMessage("start");18}19onmessage = function(e) {20  postMessage("done");21}22function doWork() {23  var worker = new Worker("test.js");24  worker.onmessage = function(e) {25  }26  worker.postMessage("start");27}28onmessage = function(e) {29  postMessage("done");30}31function doWork() {32  var worker = new Worker("test.js");33  worker.onmessage = function(e) {34  }35  worker.postMessage("start");36}37onmessage = function(e) {38  postMessage("done");39}40function doWork() {41  var worker = new Worker("test.js");42  worker.onmessage = function(e) {43  }44  worker.postMessage("start");45}46onmessage = function(e) {47  postMessage("done");Using AI Code Generation
1onmessage = function(e){2	postMessage(e.data);3}4var worker = new Worker("test.js");5worker.postMessage("Hi from main thread");6worker.onmessage = function(e){7	console.log(e.data);8}9onmessage = function(e){10	postMessage(e.data);11}12var worker = new Worker("test.js");13worker.postMessage("Hi from main thread");14worker.onmessage = function(e){15	console.log(e.data);16}17onmessage = function(e){18	postMessage(e.data);19}20var worker = new Worker("test.js");21worker.postMessage("Hi from main thread");22worker.onmessage = function(e){23	console.log(e.data);24}25onmessage = function(e){26	postMessage(e.data);27}28var worker = new Worker("test.js");29worker.postMessage("Hi from main thread");30worker.onmessage = function(e){31	console.log(e.data);32}33onmessage = function(e){34	postMessage(e.data);35}36var worker = new Worker("test.js");37worker.postMessage("Hi from main thread");38worker.onmessage = function(e){39	console.log(e.data);40}Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
