How to use onmessage method in wpt

Best JavaScript code snippet using wpt

test_queue_lifecycle.py

Source:test_queue_lifecycle.py Github

copy

Full Screen

1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may not4# use this file except in compliance with the License. You may obtain a copy5# of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations under13# the License.14import json15import ddt16import mock17from oslo_utils import uuidutils18from zaqar.common import consts19from zaqar.storage import errors as storage_errors20from zaqar import tests as testing21from zaqar.tests.unit.transport.websocket import base22from zaqar.tests.unit.transport.websocket import utils as test_utils23@ddt.ddt24class QueueLifecycleBaseTest(base.V2Base):25 config_file = "websocket_mongodb.conf"26 def setUp(self):27 super(QueueLifecycleBaseTest, self).setUp()28 self.protocol = self.transport.factory()29 def test_empty_project_id(self):30 action = consts.QUEUE_CREATE31 body = {"queue_name": "kitkat",32 "metadata": {33 "key": {34 "key2": "value",35 "key3": [1, 2, 3, 4, 5]}36 }37 }38 headers = {'Client-ID': uuidutils.generate_uuid()}39 req = test_utils.create_request(action, body, headers)40 def validator(resp, isBinary):41 resp = json.loads(resp.decode())42 self.assertEqual(400, resp['headers']['status'])43 with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:44 msg_mock.side_effect = validator45 self.protocol.onMessage(req, False)46 @ddt.data('480924', 'foo')47 def test_basics_thoroughly(self, project_id):48 # Stats are empty - queue not created yet49 action = consts.QUEUE_GET_STATS50 body = {"queue_name": "gummybears"}51 headers = {52 'Client-ID': uuidutils.generate_uuid(),53 'X-Project-ID': project_id54 }55 send_mock = mock.patch.object(self.protocol, 'sendMessage')56 self.addCleanup(send_mock.stop)57 sender = send_mock.start()58 req = test_utils.create_request(action, body, headers)59 def validator(resp, isBinary):60 resp = json.loads(resp.decode())61 self.assertEqual(404, resp['headers']['status'])62 sender.side_effect = validator63 self.protocol.onMessage(req, False)64 # Create65 action = consts.QUEUE_CREATE66 body = {"queue_name": "gummybears",67 "metadata": {68 "key": {69 "key2": "value",70 "key3": [1, 2, 3, 4, 5]},71 "messages": {"ttl": 600},72 }73 }74 req = test_utils.create_request(action, body, headers)75 def validator(resp, isBinary):76 resp = json.loads(resp.decode())77 self.assertEqual(201, resp['headers']['status'])78 sender.side_effect = validator79 self.protocol.onMessage(req, False)80 # Fetch metadata81 action = consts.QUEUE_GET82 body = {"queue_name": "gummybears"}83 meta = {"messages": {"ttl": 600},84 "key": {85 "key2": "value",86 "key3": [1, 2, 3, 4, 5]}87 }88 req = test_utils.create_request(action, body, headers)89 def validator(resp, isBinary):90 resp = json.loads(resp.decode())91 self.assertEqual(200, resp['headers']['status'])92 self.assertEqual(meta, resp['body'])93 sender.side_effect = validator94 self.protocol.onMessage(req, False)95 # Stats empty queue96 action = consts.QUEUE_GET_STATS97 body = {"queue_name": "gummybears"}98 req = test_utils.create_request(action, body, headers)99 def validator(resp, isBinary):100 resp = json.loads(resp.decode())101 self.assertEqual(200, resp['headers']['status'])102 sender.side_effect = validator103 self.protocol.onMessage(req, False)104 # Delete105 action = consts.QUEUE_DELETE106 body = {"queue_name": "gummybears"}107 req = test_utils.create_request(action, body, headers)108 def validator(resp, isBinary):109 resp = json.loads(resp.decode())110 self.assertEqual(204, resp['headers']['status'])111 sender.side_effect = validator112 self.protocol.onMessage(req, False)113 # Get non-existent stats114 action = consts.QUEUE_GET_STATS115 body = {"queue_name": "gummybears"}116 req = test_utils.create_request(action, body, headers)117 def validator(resp, isBinary):118 resp = json.loads(resp.decode())119 self.assertEqual(404, resp['headers']['status'])120 sender.side_effect = validator121 self.protocol.onMessage(req, False)122 def test_name_restrictions(self):123 headers = {124 'Client-ID': uuidutils.generate_uuid(),125 'X-Project-ID': 'test-project'126 }127 action = consts.QUEUE_CREATE128 body = {"queue_name": 'marsbar',129 "metadata": {130 "key": {131 "key2": "value",132 "key3": [1, 2, 3, 4, 5]},133 "messages": {"ttl": 600},134 }135 }136 send_mock = mock.patch.object(self.protocol, 'sendMessage')137 self.addCleanup(send_mock.stop)138 sender = send_mock.start()139 req = test_utils.create_request(action, body, headers)140 def validator(resp, isBinary):141 resp = json.loads(resp.decode())142 self.assertIn(resp['headers']['status'], [201, 204])143 sender.side_effect = validator144 self.protocol.onMessage(req, False)145 body["queue_name"] = "m@rsb@r"146 req = test_utils.create_request(action, body, headers)147 def validator(resp, isBinary):148 resp = json.loads(resp.decode())149 self.assertEqual(400, resp['headers']['status'])150 sender.side_effect = validator151 self.protocol.onMessage(req, False)152 body["queue_name"] = "marsbar" * 10153 req = test_utils.create_request(action, body, headers)154 self.protocol.onMessage(req, False)155 def test_project_id_restriction(self):156 headers = {157 'Client-ID': uuidutils.generate_uuid(),158 'X-Project-ID': 'test-project' * 30159 }160 action = consts.QUEUE_CREATE161 body = {"queue_name": 'poptart'}162 send_mock = mock.patch.object(self.protocol, 'sendMessage')163 self.addCleanup(send_mock.stop)164 sender = send_mock.start()165 req = test_utils.create_request(action, body, headers)166 def validator(resp, isBinary):167 resp = json.loads(resp.decode())168 self.assertEqual(400, resp['headers']['status'])169 sender.side_effect = validator170 self.protocol.onMessage(req, False)171 headers['X-Project-ID'] = 'test-project'172 req = test_utils.create_request(action, body, headers)173 def validator(resp, isBinary):174 resp = json.loads(resp.decode())175 self.assertIn(resp['headers']['status'], [201, 204])176 sender.side_effect = validator177 self.protocol.onMessage(req, False)178 def test_non_ascii_name(self):179 test_params = ((u'/queues/non-ascii-n\u0153me', 'utf-8'),180 (u'/queues/non-ascii-n\xc4me', 'iso8859-1'))181 headers = {182 'Client-ID': uuidutils.generate_uuid(),183 'X-Project-ID': 'test-project' * 30184 }185 action = consts.QUEUE_CREATE186 body = {"queue_name": test_params[0]}187 send_mock = mock.patch.object(self.protocol, 'sendMessage')188 self.addCleanup(send_mock.stop)189 sender = send_mock.start()190 req = test_utils.create_request(action, body, headers)191 def validator(resp, isBinary):192 resp = json.loads(resp.decode())193 self.assertEqual(400, resp['headers']['status'])194 sender.side_effect = validator195 self.protocol.onMessage(req, False)196 body = {"queue_name": test_params[1]}197 req = test_utils.create_request(action, body, headers)198 self.protocol.onMessage(req, False)199 def test_no_metadata(self):200 headers = {201 'Client-ID': uuidutils.generate_uuid(),202 'X-Project-ID': 'test-project'203 }204 action = consts.QUEUE_CREATE205 body = {"queue_name": "fizbat"}206 send_mock = mock.patch.object(self.protocol, 'sendMessage')207 self.addCleanup(send_mock.stop)208 sender = send_mock.start()209 req = test_utils.create_request(action, body, headers)210 def validator(resp, isBinary):211 resp = json.loads(resp.decode())212 self.assertIn(resp['headers']['status'], [201, 204])213 sender.side_effect = validator214 self.protocol.onMessage(req, False)215 def validator(resp, isBinary):216 resp = json.loads(resp.decode())217 self.assertEqual(204, resp['headers']['status'])218 sender.side_effect = validator219 self.protocol.onMessage(req, False)220 @ddt.data('{', '[]', '.', ' ')221 def test_bad_metadata(self, meta):222 headers = {223 'Client-ID': uuidutils.generate_uuid(),224 'X-Project-ID': 'test-project' * 30225 }226 action = consts.QUEUE_CREATE227 body = {"queue_name": "fizbat",228 "metadata": meta}229 send_mock = mock.patch.object(self.protocol, 'sendMessage')230 self.addCleanup(send_mock.stop)231 sender = send_mock.start()232 req = test_utils.create_request(action, body, headers)233 def validator(resp, isBinary):234 resp = json.loads(resp.decode())235 self.assertEqual(400, resp['headers']['status'])236 sender.side_effect = validator237 self.protocol.onMessage(req, False)238 def test_too_much_metadata(self):239 headers = {240 'Client-ID': uuidutils.generate_uuid(),241 'X-Project-ID': 'test-project'242 }243 action = consts.QUEUE_CREATE244 body = {"queue_name": "buttertoffee",245 "metadata": {"messages": {"ttl": 600},246 "padding": "x"}247 }248 max_size = self.transport_cfg.max_queue_metadata249 body["metadata"]["padding"] = "x" * max_size250 send_mock = mock.patch.object(self.protocol, 'sendMessage')251 self.addCleanup(send_mock.stop)252 sender = send_mock.start()253 req = test_utils.create_request(action, body, headers)254 def validator(resp, isBinary):255 resp = json.loads(resp.decode())256 self.assertEqual(400, resp['headers']['status'])257 sender.side_effect = validator258 self.protocol.onMessage(req, False)259 def test_way_too_much_metadata(self):260 headers = {261 'Client-ID': uuidutils.generate_uuid(),262 'X-Project-ID': 'test-project'263 }264 action = consts.QUEUE_CREATE265 body = {"queue_name": "peppermint",266 "metadata": {"messages": {"ttl": 600},267 "padding": "x"}268 }269 max_size = self.transport_cfg.max_queue_metadata270 body["metadata"]["padding"] = "x" * max_size * 5271 send_mock = mock.patch.object(self.protocol, 'sendMessage')272 self.addCleanup(send_mock.stop)273 sender = send_mock.start()274 req = test_utils.create_request(action, body, headers)275 def validator(resp, isBinary):276 resp = json.loads(resp.decode())277 self.assertEqual(400, resp['headers']['status'])278 sender.side_effect = validator279 self.protocol.onMessage(req, False)280 def test_update_metadata(self):281 self.skip("Implement patch method")282 headers = {283 'Client-ID': uuidutils.generate_uuid(),284 'X-Project-ID': 'test-project'285 }286 action = consts.QUEUE_CREATE287 body = {"queue_name": "bonobon"}288 send_mock = mock.patch.object(self.protocol, 'sendMessage')289 self.addCleanup(send_mock.stop)290 sender = send_mock.start()291 # Create292 req = test_utils.create_request(action, body, headers)293 def validator(resp, isBinary):294 resp = json.loads(resp.decode())295 self.assertEqual(201, resp['headers']['status'])296 sender.side_effect = validator297 self.protocol.onMessage(req, False)298 # Set meta299 meta1 = {"messages": {"ttl": 600}, "padding": "x"}300 body["metadata"] = meta1301 req = test_utils.create_request(action, body, headers)302 def validator(resp, isBinary):303 resp = json.loads(resp.decode())304 self.assertEqual(204, resp['headers']['status'])305 sender.side_effect = validator306 self.protocol.onMessage(req, False)307 # Get308 action = consts.QUEUE_GET309 body = {"queue_name": "bonobon"}310 req = test_utils.create_request(action, body, headers)311 def validator(resp, isBinary):312 resp = json.loads(resp.decode())313 self.assertEqual(204, resp['headers']['status'])314 self.assertEqual(meta1, resp['body'])315 sender.side_effect = validator316 self.protocol.onMessage(req, False)317 # Update318 action = consts.QUEUE_CREATE319 meta2 = {"messages": {"ttl": 100}, "padding": "y"}320 body["metadata"] = meta2321 req = test_utils.create_request(action, body, headers)322 def validator(resp, isBinary):323 resp = json.loads(resp.decode())324 self.assertEqual(204, resp['headers']['status'])325 sender.side_effect = validator326 self.protocol.onMessage(req, False)327 # Get again328 action = consts.QUEUE_GET329 body = {"queue_name": "bonobon"}330 req = test_utils.create_request(action, body, headers)331 def validator(resp, isBinary):332 resp = json.loads(resp.decode())333 self.assertEqual(200, resp['headers']['status'])334 self.assertEqual(meta2, resp['body'])335 sender.side_effect = validator336 self.protocol.onMessage(req, False)337 def test_list(self):338 arbitrary_number = 644079696574693339 project_id = str(arbitrary_number)340 client_id = uuidutils.generate_uuid()341 headers = {342 'X-Project-ID': project_id,343 'Client-ID': client_id344 }345 send_mock = mock.patch.object(self.protocol, 'sendMessage')346 self.addCleanup(send_mock.stop)347 sender = send_mock.start()348 # NOTE(kgriffs): It's important that this one sort after the one349 # above. This is in order to prove that bug/1236605 is fixed, and350 # stays fixed!351 # NOTE(vkmc): In websockets as well!352 alt_project_id = str(arbitrary_number + 1)353 # List empty354 action = consts.QUEUE_LIST355 body = {}356 req = test_utils.create_request(action, body, headers)357 def validator(resp, isBinary):358 resp = json.loads(resp.decode())359 self.assertEqual(200, resp['headers']['status'])360 self.assertEqual([], resp['body']['queues'])361 sender.side_effect = validator362 self.protocol.onMessage(req, False)363 # Payload exceeded364 body = {'limit': 21}365 req = test_utils.create_request(action, body, headers)366 def validator(resp, isBinary):367 resp = json.loads(resp.decode())368 self.assertEqual(400, resp['headers']['status'])369 sender.side_effect = validator370 self.protocol.onMessage(req, False)371 # Create some372 def create_queue(project_id, queue_name, metadata):373 altheaders = {'Client-ID': client_id}374 if project_id is not None:375 altheaders['X-Project-ID'] = project_id376 action = consts.QUEUE_CREATE377 body['queue_name'] = queue_name378 body['metadata'] = metadata379 req = test_utils.create_request(action, body, altheaders)380 def validator(resp, isBinary):381 resp = json.loads(resp.decode())382 self.assertEqual(201, resp['headers']['status'])383 sender.side_effect = validator384 self.protocol.onMessage(req, False)385 create_queue(project_id, 'q1', {"node": 31})386 create_queue(project_id, 'q2', {"node": 32})387 create_queue(project_id, 'q3', {"node": 33})388 create_queue(alt_project_id, 'q3', {"alt": 1})389 # List (limit)390 body = {'limit': 2}391 req = test_utils.create_request(action, body, headers)392 def validator(resp, isBinary):393 resp = json.loads(resp.decode())394 self.assertEqual(2, len(resp['body']['queues']))395 sender.side_effect = validator396 self.protocol.onMessage(req, False)397 # List (no metadata, get all)398 body = {'limit': 5}399 req = test_utils.create_request(action, body, headers)400 def validator(resp, isBinary):401 resp = json.loads(resp.decode())402 self.assertEqual(200, resp['headers']['status'])403 # Ensure we didn't pick up the queue from the alt project.404 self.assertEqual(3, len(resp['body']['queues']))405 sender.side_effect = validator406 self.protocol.onMessage(req, False)407 # List with metadata408 body = {'detailed': True}409 req = test_utils.create_request(action, body, headers)410 def validator(resp, isBinary):411 resp = json.loads(resp.decode())412 self.assertEqual(200, resp['headers']['status'])413 sender.side_effect = validator414 self.protocol.onMessage(req, False)415 action = consts.QUEUE_GET416 body = {"queue_name": "q1"}417 req = test_utils.create_request(action, body, headers)418 def validator(resp, isBinary):419 resp = json.loads(resp.decode())420 self.assertEqual(200, resp['headers']['status'])421 self.assertEqual({"node": 31}, resp['body'])422 sender.side_effect = validator423 self.protocol.onMessage(req, False)424 # List tail425 action = consts.QUEUE_LIST426 body = {}427 req = test_utils.create_request(action, body, headers)428 def validator(resp, isBinary):429 resp = json.loads(resp.decode())430 self.assertEqual(200, resp['headers']['status'])431 sender.side_effect = validator432 self.protocol.onMessage(req, False)433 # List manually-constructed tail434 body = {'marker': "zzz"}435 req = test_utils.create_request(action, body, headers)436 self.protocol.onMessage(req, False)437 def test_list_returns_503_on_nopoolfound_exception(self):438 headers = {439 'Client-ID': uuidutils.generate_uuid(),440 'X-Project-ID': 'test-project'441 }442 action = consts.QUEUE_LIST443 body = {}444 send_mock = mock.patch.object(self.protocol, 'sendMessage')445 self.addCleanup(send_mock.stop)446 sender = send_mock.start()447 req = test_utils.create_request(action, body, headers)448 def validator(resp, isBinary):449 resp = json.loads(resp.decode())450 self.assertEqual(503, resp['headers']['status'])451 sender.side_effect = validator452 queue_controller = self.boot.storage.queue_controller453 with mock.patch.object(queue_controller, 'list') as mock_queue_list:454 def queue_generator():455 raise storage_errors.NoPoolFound()456 # This generator tries to be like queue controller list generator457 # in some ways.458 def fake_generator():459 yield queue_generator()460 yield {}461 mock_queue_list.return_value = fake_generator()462 self.protocol.onMessage(req, False)463 def _post_messages(self, queue_name, headers, repeat=1):464 messages = [{'body': 239, 'ttl': 300}] * repeat465 action = consts.MESSAGE_POST466 body = {"queue_name": queue_name,467 "messages": messages}468 send_mock = mock.Mock()469 self.protocol.sendMessage = send_mock470 req = test_utils.create_request(action, body, headers)471 self.protocol.onMessage(req, False)472 return json.loads(send_mock.call_args[0][0].decode())473 def test_purge(self):474 arbitrary_number = 644079696574693475 project_id = str(arbitrary_number)476 client_id = uuidutils.generate_uuid()477 headers = {478 'X-Project-ID': project_id,479 'Client-ID': client_id480 }481 queue_name = 'myqueue'482 resp = self._post_messages(queue_name, headers, repeat=5)483 msg_ids = resp['body']['message_ids']484 send_mock = mock.Mock()485 self.protocol.sendMessage = send_mock486 for msg_id in msg_ids:487 action = consts.MESSAGE_GET488 body = {"queue_name": queue_name, "message_id": msg_id}489 req = test_utils.create_request(action, body, headers)490 self.protocol.onMessage(req, False)491 resp = json.loads(send_mock.call_args[0][0].decode())492 self.assertEqual(200, resp['headers']['status'])493 action = consts.QUEUE_PURGE494 body = {"queue_name": queue_name, "resource_types": ["messages"]}495 req = test_utils.create_request(action, body, headers)496 self.protocol.onMessage(req, False)497 resp = json.loads(send_mock.call_args[0][0].decode())498 self.assertEqual(204, resp['headers']['status'])499 for msg_id in msg_ids:500 action = consts.MESSAGE_GET501 body = {"queue_name": queue_name, "message_id": msg_id}502 req = test_utils.create_request(action, body, headers)503 self.protocol.onMessage(req, False)504 resp = json.loads(send_mock.call_args[0][0].decode())505 self.assertEqual(404, resp['headers']['status'])506class TestQueueLifecycleMongoDB(QueueLifecycleBaseTest):507 config_file = 'websocket_mongodb.conf'508 @testing.requires_mongodb509 def setUp(self):510 super(TestQueueLifecycleMongoDB, self).setUp()511 def tearDown(self):512 storage = self.boot.storage._storage513 connection = storage.connection514 connection.drop_database(self.boot.control.queues_database)515 for db in storage.message_databases:516 connection.drop_database(db)...

Full Screen

Full Screen

test_messages.py

Source:test_messages.py Github

copy

Full Screen

1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or12# implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import datetime16import json17import ddt18import mock19from oslo_utils import timeutils20from oslo_utils import uuidutils21import six22from testtools import matchers23from zaqar.common import consts24from zaqar.tests.unit.transport.websocket import base25from zaqar.tests.unit.transport.websocket import utils as test_utils26from zaqar.transport import validation27@ddt.ddt28class MessagesBaseTest(base.V2Base):29 config_file = "websocket_mongodb.conf"30 def setUp(self):31 super(MessagesBaseTest, self).setUp()32 self.protocol = self.transport.factory()33 self.default_message_ttl = 360034 self.project_id = '7e55e1a7e'35 self.headers = {36 'Client-ID': uuidutils.generate_uuid(),37 'X-Project-ID': self.project_id38 }39 body = {"queue_name": "kitkat"}40 req = test_utils.create_request(consts.QUEUE_CREATE,41 body, self.headers)42 with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:43 self.protocol.onMessage(req, False)44 resp = json.loads(msg_mock.call_args[0][0].decode())45 self.assertIn(resp['headers']['status'], [201, 204])46 def tearDown(self):47 super(MessagesBaseTest, self).tearDown()48 body = {"queue_name": "kitkat"}49 send_mock = mock.Mock()50 self.protocol.sendMessage = send_mock51 req = test_utils.create_request(consts.QUEUE_DELETE,52 body, self.headers)53 self.protocol.onMessage(req, False)54 resp = json.loads(send_mock.call_args[0][0].decode())55 self.assertEqual(204, resp['headers']['status'])56 def _test_post(self, sample_messages, in_binary=False):57 body = {"queue_name": "kitkat",58 "messages": sample_messages}59 send_mock = mock.Mock()60 self.protocol.sendMessage = send_mock61 dumps, loads, create_req = test_utils.get_pack_tools(binary=in_binary)62 req = create_req(consts.MESSAGE_POST, body, self.headers)63 self.protocol.onMessage(req, in_binary)64 arg = send_mock.call_args[0][0]65 if not in_binary:66 arg = arg.decode()67 resp = loads(arg)68 self.assertEqual(201, resp['headers']['status'])69 self.msg_ids = resp['body']['message_ids']70 self.assertEqual(len(sample_messages), len(self.msg_ids))71 lookup = dict([(m['ttl'], m['body']) for m in sample_messages])72 # Test GET on the message resource directly73 # NOTE(cpp-cabrera): force the passing of time to age a message74 timeutils_utcnow = 'oslo_utils.timeutils.utcnow'75 now = timeutils.utcnow() + datetime.timedelta(seconds=10)76 with mock.patch(timeutils_utcnow) as mock_utcnow:77 mock_utcnow.return_value = now78 for msg_id in self.msg_ids:79 headers = self.headers.copy()80 headers['X-Project-ID'] = '777777'81 # Wrong project ID82 action = consts.MESSAGE_GET83 body = {"queue_name": "kitkat",84 "message_id": msg_id}85 req = create_req(action, body, headers)86 self.protocol.onMessage(req, in_binary)87 arg = send_mock.call_args[0][0]88 if not in_binary:89 arg = arg.decode()90 resp = loads(arg)91 self.assertEqual(404, resp['headers']['status'])92 # Correct project ID93 req = create_req(action, body, self.headers)94 self.protocol.onMessage(req, in_binary)95 arg = send_mock.call_args[0][0]96 if not in_binary:97 arg = arg.decode()98 resp = loads(arg)99 self.assertEqual(200, resp['headers']['status'])100 # Check message properties101 message = resp['body']['messages']102 self.assertEqual(lookup[message['ttl']], message['body'])103 self.assertEqual(msg_id, message['id'])104 # no negative age105 # NOTE(cpp-cabrera): testtools lacks106 # GreaterThanEqual on py26107 self.assertThat(message['age'],108 matchers.GreaterThan(-1))109 # Test bulk GET110 action = consts.MESSAGE_GET_MANY111 body = {"queue_name": "kitkat",112 "message_ids": self.msg_ids}113 req = create_req(action, body, self.headers)114 self.protocol.onMessage(req, in_binary)115 arg = send_mock.call_args[0][0]116 if not in_binary:117 arg = arg.decode()118 resp = loads(arg)119 self.assertEqual(200, resp['headers']['status'])120 expected_ttls = set(m['ttl'] for m in sample_messages)121 actual_ttls = set(m['ttl'] for m in resp['body']['messages'])122 self.assertFalse(expected_ttls - actual_ttls)123 actual_ids = set(m['id'] for m in resp['body']['messages'])124 self.assertFalse(set(self.msg_ids) - actual_ids)125 def test_exceeded_payloads(self):126 # Get a valid message id127 resp = self._post_messages("kitkat")128 msg_id = resp['body']['message_ids']129 # Bulk GET restriction130 get_msg_ids = msg_id * 21131 action = consts.MESSAGE_GET_MANY132 body = {"queue_name": "kitkat",133 "message_ids": get_msg_ids}134 send_mock = mock.Mock()135 self.protocol.sendMessage = send_mock136 req = test_utils.create_request(action, body, self.headers)137 self.protocol.onMessage(req, False)138 resp = json.loads(send_mock.call_args[0][0].decode())139 self.assertEqual(400, resp['headers']['status'])140 # Listing restriction141 body['limit'] = 21142 req = test_utils.create_request(action, body, self.headers)143 self.protocol.onMessage(req, False)144 resp = json.loads(send_mock.call_args[0][0].decode())145 self.assertEqual(400, resp['headers']['status'])146 # Bulk deletion restriction147 del_msg_ids = msg_id * 22148 action = consts.MESSAGE_GET_MANY149 body = {"queue_name": "kitkat",150 "message_ids": del_msg_ids}151 req = test_utils.create_request(action, body, self.headers)152 self.protocol.onMessage(req, False)153 resp = json.loads(send_mock.call_args[0][0].decode())154 self.assertEqual(400, resp['headers']['status'])155 @ddt.data(True, False)156 def test_post_single(self, in_binary):157 sample_messages = [158 {'body': {'key': 'value'}, 'ttl': 200},159 ]160 self._test_post(sample_messages, in_binary=in_binary)161 @ddt.data(True, False)162 def test_post_multiple(self, in_binary):163 sample_messages = [164 {'body': 239, 'ttl': 100},165 {'body': {'key': 'value'}, 'ttl': 200},166 {'body': [1, 3], 'ttl': 300},167 ]168 self._test_post(sample_messages, in_binary=in_binary)169 def test_post_optional_ttl(self):170 messages = [{'body': 239},171 {'body': {'key': 'value'}, 'ttl': 200}]172 action = consts.MESSAGE_POST173 body = {"queue_name": "kitkat",174 "messages": messages}175 req = test_utils.create_request(action, body, self.headers)176 send_mock = mock.Mock()177 self.protocol.sendMessage = send_mock178 self.protocol.onMessage(req, False)179 resp = json.loads(send_mock.call_args[0][0].decode())180 self.assertEqual(201, resp['headers']['status'])181 msg_id = resp['body']['message_ids'][0]182 action = consts.MESSAGE_GET183 body = {"queue_name": "kitkat", "message_id": msg_id}184 req = test_utils.create_request(action, body, self.headers)185 self.protocol.onMessage(req, False)186 resp = json.loads(send_mock.call_args[0][0].decode())187 self.assertEqual(200, resp['headers']['status'])188 self.assertEqual(self.default_message_ttl,189 resp['body']['messages']['ttl'])190 def test_post_to_non_ascii_queue(self):191 queue_name = u'non-ascii-n\u0153me'192 if six.PY2:193 queue_name = queue_name.encode('utf-8')194 resp = self._post_messages(queue_name)195 self.assertEqual(400, resp['headers']['status'])196 def test_post_with_long_queue_name(self):197 # NOTE(kgriffs): This test verifies that routes with198 # embedded queue name params go through the validation199 # hook, regardless of the target resource.200 queue_name = 'v' * validation.QUEUE_NAME_MAX_LEN201 resp = self._post_messages(queue_name)202 self.assertEqual(201, resp['headers']['status'])203 queue_name += 'v'204 resp = self._post_messages(queue_name)205 self.assertEqual(400, resp['headers']['status'])206 def test_post_to_missing_queue(self):207 queue_name = 'nonexistent'208 resp = self._post_messages(queue_name)209 self.assertEqual(201, resp['headers']['status'])210 def test_post_invalid_ttl(self):211 sample_messages = [212 {'body': {'key': 'value'}, 'ttl': '200'},213 ]214 action = consts.MESSAGE_POST215 body = {"queue_name": "kitkat",216 "messages": sample_messages}217 send_mock = mock.patch.object(self.protocol, 'sendMessage')218 self.addCleanup(send_mock.stop)219 send_mock = send_mock.start()220 req = test_utils.create_request(action, body, self.headers)221 self.protocol.onMessage(req, False)222 resp = json.loads(send_mock.call_args[0][0].decode())223 self.assertEqual(400, resp['headers']['status'])224 self.assertEqual(225 'Bad request. The value of the "ttl" field must be a int.',226 resp['body']['exception'])227 def test_post_no_body(self):228 sample_messages = [229 {'ttl': 200},230 ]231 action = consts.MESSAGE_POST232 body = {"queue_name": "kitkat",233 "messages": sample_messages}234 send_mock = mock.patch.object(self.protocol, 'sendMessage')235 self.addCleanup(send_mock.stop)236 send_mock = send_mock.start()237 req = test_utils.create_request(action, body, self.headers)238 self.protocol.onMessage(req, False)239 resp = json.loads(send_mock.call_args[0][0].decode())240 self.assertEqual(400, resp['headers']['status'])241 self.assertEqual(242 'Bad request. Missing "body" field.', resp['body']['exception'])243 def test_get_from_missing_queue(self):244 action = consts.MESSAGE_LIST245 body = {"queue_name": "anothernonexistent"}246 req = test_utils.create_request(action, body, self.headers)247 send_mock = mock.Mock()248 self.protocol.sendMessage = send_mock249 self.protocol.onMessage(req, False)250 resp = json.loads(send_mock.call_args[0][0].decode())251 self.assertEqual(200, resp['headers']['status'])252 self.assertEqual([], resp['body']['messages'])253 @ddt.data('', '0xdeadbeef', '550893e0-2b6e-11e3-835a-5cf9dd72369')254 def test_bad_client_id(self, text_id):255 action = consts.MESSAGE_POST256 body = {257 "queue_name": "kinder",258 "messages": [{"ttl": 60,259 "body": ""}]260 }261 headers = {262 'Client-ID': text_id,263 'X-Project-ID': self.project_id264 }265 send_mock = mock.Mock()266 self.protocol.sendMessage = send_mock267 req = test_utils.create_request(action, body, headers)268 self.protocol.onMessage(req, False)269 resp = json.loads(send_mock.call_args[0][0].decode())270 self.assertEqual(400, resp['headers']['status'])271 action = consts.MESSAGE_GET272 body = {273 "queue_name": "kinder",274 "limit": 3,275 "echo": True276 }277 req = test_utils.create_request(action, body, headers)278 self.protocol.onMessage(req, False)279 resp = json.loads(send_mock.call_args[0][0].decode())280 self.assertEqual(400, resp['headers']['status'])281 @ddt.data(None, '[', '[]', '{}', '.')282 def test_post_bad_message(self, document):283 action = consts.MESSAGE_POST284 body = {285 "queue_name": "kinder",286 "messages": document287 }288 send_mock = mock.Mock()289 self.protocol.sendMessage = send_mock290 req = test_utils.create_request(action, body, self.headers)291 self.protocol.onMessage(req, False)292 resp = json.loads(send_mock.call_args[0][0].decode())293 self.assertEqual(400, resp['headers']['status'])294 @ddt.data(-1, 59, 1209601)295 def test_unacceptable_ttl(self, ttl):296 action = consts.MESSAGE_POST297 body = {"queue_name": "kinder",298 "messages": [{"ttl": ttl, "body": ""}]}299 send_mock = mock.Mock()300 self.protocol.sendMessage = send_mock301 req = test_utils.create_request(action, body, self.headers)302 self.protocol.onMessage(req, False)303 resp = json.loads(send_mock.call_args[0][0].decode())304 self.assertEqual(400, resp['headers']['status'])305 def test_exceeded_message_posting(self):306 # Total (raw request) size307 document = [{'body': "some body", 'ttl': 100}] * 8000308 action = consts.MESSAGE_POST309 body = {310 "queue_name": "kinder",311 "messages": document312 }313 send_mock = mock.Mock()314 self.protocol.sendMessage = send_mock315 req = test_utils.create_request(action, body, self.headers)316 self.protocol.onMessage(req, False)317 resp = json.loads(send_mock.call_args[0][0].decode())318 self.assertEqual(400, resp['headers']['status'])319 @ddt.data('{"overflow": 9223372036854775808}',320 '{"underflow": -9223372036854775809}')321 def test_unsupported_json(self, document):322 action = consts.MESSAGE_POST323 body = {324 "queue_name": "fizz",325 "messages": document326 }327 send_mock = mock.Mock()328 self.protocol.sendMessage = send_mock329 req = test_utils.create_request(action, body, self.headers)330 self.protocol.onMessage(req, False)331 resp = json.loads(send_mock.call_args[0][0].decode())332 self.assertEqual(400, resp['headers']['status'])333 def test_delete(self):334 resp = self._post_messages("tofi")335 msg_id = resp['body']['message_ids'][0]336 action = consts.MESSAGE_GET337 body = {"queue_name": "tofi",338 "message_id": msg_id}339 send_mock = mock.Mock()340 self.protocol.sendMessage = send_mock341 req = test_utils.create_request(action, body, self.headers)342 self.protocol.onMessage(req, False)343 resp = json.loads(send_mock.call_args[0][0].decode())344 self.assertEqual(200, resp['headers']['status'])345 # Delete queue346 action = consts.MESSAGE_DELETE347 req = test_utils.create_request(action, body, self.headers)348 self.protocol.onMessage(req, False)349 resp = json.loads(send_mock.call_args[0][0].decode())350 self.assertEqual(204, resp['headers']['status'])351 # Get non existent queue352 action = consts.MESSAGE_GET353 req = test_utils.create_request(action, body, self.headers)354 self.protocol.onMessage(req, False)355 resp = json.loads(send_mock.call_args[0][0].decode())356 self.assertEqual(404, resp['headers']['status'])357 # Safe to delete non-existing ones358 action = consts.MESSAGE_DELETE359 req = test_utils.create_request(action, body, self.headers)360 self.protocol.onMessage(req, False)361 resp = json.loads(send_mock.call_args[0][0].decode())362 self.assertEqual(204, resp['headers']['status'])363 def test_bulk_delete(self):364 resp = self._post_messages("nerds", repeat=5)365 msg_ids = resp['body']['message_ids']366 action = consts.MESSAGE_DELETE_MANY367 body = {"queue_name": "nerds",368 "message_ids": msg_ids}369 send_mock = mock.Mock()370 self.protocol.sendMessage = send_mock371 req = test_utils.create_request(action, body, self.headers)372 self.protocol.onMessage(req, False)373 resp = json.loads(send_mock.call_args[0][0].decode())374 self.assertEqual(204, resp['headers']['status'])375 action = consts.MESSAGE_GET376 req = test_utils.create_request(action, body, self.headers)377 self.protocol.onMessage(req, False)378 resp = json.loads(send_mock.call_args[0][0].decode())379 self.assertEqual(400, resp['headers']['status'])380 # Safe to delete non-existing ones381 action = consts.MESSAGE_DELETE_MANY382 req = test_utils.create_request(action, body, self.headers)383 self.protocol.onMessage(req, False)384 resp = json.loads(send_mock.call_args[0][0].decode())385 self.assertEqual(204, resp['headers']['status'])386 # Even after the queue is gone387 action = consts.QUEUE_DELETE388 body = {"queue_name": "nerds"}389 req = test_utils.create_request(action, body, self.headers)390 self.protocol.onMessage(req, False)391 resp = json.loads(send_mock.call_args[0][0].decode())392 self.assertEqual(204, resp['headers']['status'])393 action = consts.MESSAGE_DELETE_MANY394 body = {"queue_name": "nerds",395 "message_ids": msg_ids}396 req = test_utils.create_request(action, body, self.headers)397 self.protocol.onMessage(req, False)398 resp = json.loads(send_mock.call_args[0][0].decode())399 self.assertEqual(204, resp['headers']['status'])400 def test_pop_delete(self):401 self._post_messages("kitkat", repeat=5)402 action = consts.MESSAGE_DELETE_MANY403 body = {"queue_name": "kitkat", "pop": 2}404 send_mock = mock.Mock()405 self.protocol.sendMessage = send_mock406 req = test_utils.create_request(action, body, self.headers)407 self.protocol.onMessage(req, False)408 resp = json.loads(send_mock.call_args[0][0].decode())409 self.assertEqual(200, resp['headers']['status'])410 self.assertEqual(2, len(resp['body']['messages']))411 self.assertEqual(239, resp['body']['messages'][0]['body'])412 self.assertEqual(239, resp['body']['messages'][1]['body'])413 def test_get_nonexistent_message_404s(self):414 action = consts.MESSAGE_GET415 body = {"queue_name": "notthere",416 "message_id": "a"}417 send_mock = mock.Mock()418 self.protocol.sendMessage = send_mock419 req = test_utils.create_request(action, body, self.headers)420 self.protocol.onMessage(req, False)421 resp = json.loads(send_mock.call_args[0][0].decode())422 self.assertEqual(404, resp['headers']['status'])423 def test_get_multiple_invalid_messages_404s(self):424 action = consts.MESSAGE_GET_MANY425 body = {"queue_name": "notnotthere",426 "message_ids": ["a", "b", "c"]}427 send_mock = mock.Mock()428 self.protocol.sendMessage = send_mock429 req = test_utils.create_request(action, body, self.headers)430 self.protocol.onMessage(req, False)431 resp = json.loads(send_mock.call_args[0][0].decode())432 self.assertEqual(200, resp['headers']['status'])433 def test_delete_multiple_invalid_messages_204s(self):434 action = consts.MESSAGE_DELETE435 body = {"queue_name": "yetanothernotthere",436 "message_ids": ["a", "b", "c"]}437 send_mock = mock.Mock()438 self.protocol.sendMessage = send_mock439 req = test_utils.create_request(action, body, self.headers)440 self.protocol.onMessage(req, False)441 resp = json.loads(send_mock.call_args[0][0].decode())442 self.assertEqual(400, resp['headers']['status'])443 def _post_messages(self, queue_name, repeat=1):444 messages = [{'body': 239, 'ttl': 300}] * repeat445 action = consts.MESSAGE_POST446 body = {"queue_name": queue_name,447 "messages": messages}448 send_mock = mock.Mock()449 self.protocol.sendMessage = send_mock450 req = test_utils.create_request(action, body, self.headers)451 self.protocol.onMessage(req, False)452 return json.loads(send_mock.call_args[0][0].decode())453 def test_invalid_request(self):454 send_mock = mock.Mock()455 self.protocol.sendMessage = send_mock456 self.protocol.onMessage('foo', False)457 self.assertEqual(1, send_mock.call_count)458 response = json.loads(send_mock.call_args[0][0].decode())459 self.assertIn('error', response['body'])460 self.assertEqual({'status': 400}, response['headers'])461 self.assertEqual(462 {'action': None, 'api': 'v2', 'body': {}, 'headers': {}},...

Full Screen

Full Screen

bybit.py

Source:bybit.py Github

copy

Full Screen

...63 elif resp.url.path == "/v2/public/kline/list":64 self.kline._onresponse(data["result"])65 elif resp.url.path == "/v2/private/wallet/balance":66 self.wallet._onresponse(data["result"])67 def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:68 if "success" in msg:69 if not msg["success"]:70 logger.warning(msg)71 if "topic" in msg:72 topic: str = msg["topic"]73 data = msg["data"]74 if any(75 [76 topic.startswith("orderBookL2_25"),77 topic.startswith("orderBook_200"),78 ]79 ):80 self.orderbook._onmessage(topic, msg["type"], data)81 elif topic.startswith("trade"):82 self.trade._onmessage(data)83 elif topic.startswith("insurance"):84 self.insurance._onmessage(data)85 elif topic.startswith("instrument_info"):86 self.instrument._onmessage(topic, msg["type"], data)87 if topic.startswith("klineV2"):88 self.kline._onmessage(topic, data)89 elif topic.startswith("liquidation"):90 self.liquidation._onmessage(data)91 elif topic == "position":92 self.position._onmessage(data)93 elif topic == "execution":94 self.execution._onmessage(data)95 elif topic == "order":96 self.order._onmessage(data)97 elif topic == "stop_order":98 self.stoporder._onmessage(data)99 elif topic == "wallet":100 self.wallet._onmessage(data)101 if "timestamp_e6" in msg:102 self.timestamp_e6 = int(msg["timestamp_e6"])103 @property104 def orderbook(self) -> "OrderBookInverse":105 return self.get("orderbook", OrderBookInverse)106 @property107 def trade(self) -> "TradeInverse":108 return self.get("trade", TradeInverse)109 @property110 def insurance(self) -> "Insurance":111 return self.get("insurance", Insurance)112 @property113 def instrument(self) -> "InstrumentInverse":114 return self.get("instrument", InstrumentInverse)115 @property116 def kline(self) -> "KlineInverse":117 return self.get("kline", KlineInverse)118 @property119 def liquidation(self) -> "LiquidationInverse":120 return self.get("liquidation", LiquidationInverse)121 @property122 def position(self) -> "PositionInverse":123 """124 インバース契約(無期限/先物)用のポジション125 """126 return self.get("position", PositionInverse)127 @property128 def execution(self) -> "ExecutionInverse":129 return self.get("execution", ExecutionInverse)130 @property131 def order(self) -> "OrderInverse":132 """133 アクティブオーダーのみ(約定・キャンセル済みは削除される)134 """135 return self.get("order", OrderInverse)136 @property137 def stoporder(self) -> "StopOrderInverse":138 """139 アクティブオーダーのみ(トリガー済みは削除される)140 """141 return self.get("stoporder", StopOrderInverse)142 @property143 def wallet(self) -> "WalletInverse":144 return self.get("wallet", WalletInverse)145class BybitUSDTDataStore(DataStoreManager):146 """147 Bybit USDT契約のデータストアマネージャー148 """149 def _init(self) -> None:150 self.create("orderbook", datastore_class=OrderBookUSDT)151 self.create("trade", datastore_class=TradeUSDT)152 self.create("insurance", datastore_class=Insurance)153 self.create("instrument", datastore_class=InstrumentUSDT)154 self.create("kline", datastore_class=KlineUSDT)155 self.create("liquidation", datastore_class=LiquidationUSDT)156 self.create("position", datastore_class=PositionUSDT)157 self.create("execution", datastore_class=ExecutionUSDT)158 self.create("order", datastore_class=OrderUSDT)159 self.create("stoporder", datastore_class=StopOrderUSDT)160 self.create("wallet", datastore_class=WalletUSDT)161 self.timestamp_e6: Optional[int] = None162 async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:163 """164 対応エンドポイント165 - GET /private/linear/order/search (DataStore: order)166 - GET /private/linear/stop-order/search (DataStore: stoporder)167 - GET /private/linear/position/list (DataStore: position)168 - GET /private/linear/position/list (DataStore: position)169 - GET /public/linear/kline (DataStore: kline)170 - GET /v2/private/wallet/balance (DataStore: wallet)171 """172 for f in asyncio.as_completed(aws):173 resp = await f174 data = await resp.json()175 if data["ret_code"] != 0:176 raise ValueError(177 "Response error at DataStore initialization\n"178 f"URL: {resp.url}\n"179 f"Data: {data}"180 )181 if resp.url.path == "/private/linear/order/search":182 self.order._onresponse(data["result"])183 elif resp.url.path == "/private/linear/stop-order/search":184 self.stoporder._onresponse(data["result"])185 elif resp.url.path == "/private/linear/position/list":186 self.position._onresponse(data["result"])187 elif resp.url.path == "/public/linear/kline":188 self.kline._onresponse(data["result"])189 elif resp.url.path == "/v2/private/wallet/balance":190 self.wallet._onresponse(data["result"])191 def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:192 if "success" in msg:193 if not msg["success"]:194 logger.warning(msg)195 if "topic" in msg:196 topic: str = msg["topic"]197 data = msg["data"]198 if any(199 [200 topic.startswith("orderBookL2_25"),201 topic.startswith("orderBook_200"),202 ]203 ):204 self.orderbook._onmessage(topic, msg["type"], data)205 elif topic.startswith("trade"):206 self.trade._onmessage(data)207 elif topic.startswith("instrument_info"):208 self.instrument._onmessage(topic, msg["type"], data)209 if topic.startswith("candle"):210 self.kline._onmessage(topic, data)211 elif topic.startswith("liquidation"):212 self.liquidation._onmessage(data)213 elif topic == "position":214 self.position._onmessage(data)215 elif topic == "execution":216 self.execution._onmessage(data)217 elif topic == "order":218 self.order._onmessage(data)219 elif topic == "stop_order":220 self.stoporder._onmessage(data)221 elif topic == "wallet":222 self.wallet._onmessage(data)223 if "timestamp_e6" in msg:224 self.timestamp_e6 = int(msg["timestamp_e6"])225 @property226 def orderbook(self) -> "OrderBookUSDT":227 return self.get("orderbook", OrderBookUSDT)228 @property229 def trade(self) -> "TradeUSDT":230 return self.get("trade", TradeUSDT)231 @property232 def instrument(self) -> "InstrumentUSDT":233 return self.get("instrument", InstrumentUSDT)234 @property235 def kline(self) -> "KlineUSDT":236 return self.get("kline", KlineUSDT)237 @property238 def liquidation(self) -> "LiquidationUSDT":239 return self.get("liquidation", LiquidationUSDT)240 @property241 def position(self) -> "PositionUSDT":242 """243 USDT契約用のポジション244 """245 return self.get("position", PositionUSDT)246 @property247 def execution(self) -> "ExecutionUSDT":248 return self.get("execution", ExecutionUSDT)249 @property250 def order(self) -> "OrderUSDT":251 """252 アクティブオーダーのみ(約定・キャンセル済みは削除される)253 """254 return self.get("order", OrderUSDT)255 @property256 def stoporder(self) -> "StopOrderUSDT":257 """258 アクティブオーダーのみ(トリガー済みは削除される)259 """260 return self.get("stoporder", StopOrderUSDT)261 @property262 def wallet(self) -> "WalletUSDT":263 return self.get("wallet", WalletUSDT)264class OrderBookInverse(DataStore):265 _KEYS = ["symbol", "id", "side"]266 def sorted(self, query: Optional[Item] = None) -> dict[str, list[Item]]:267 if query is None:268 query = {}269 result = {"Sell": [], "Buy": []}270 for item in self:271 if all(k in item and query[k] == item[k] for k in query):272 result[item["side"]].append(item)273 result["Sell"].sort(key=lambda x: x["id"])274 result["Buy"].sort(key=lambda x: x["id"], reverse=True)275 return result276 def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:277 if type_ == "snapshot":278 symbol = topic.split(".")[-1]279 # ex: "orderBookL2_25.BTCUSD", "orderBook_200.100ms.BTCUSD"280 result = self.find({"symbol": symbol})281 self._delete(result)282 self._insert(data)283 elif type_ == "delta":284 self._delete(data["delete"])285 self._update(data["update"])286 self._insert(data["insert"])287class OrderBookUSDT(OrderBookInverse):288 def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:289 if type_ == "snapshot":290 symbol = topic.split(".")[-1]291 # ex: "orderBookL2_25.BTCUSDT", "orderBook_200.100ms.BTCUSDT"292 result = self.find({"symbol": symbol})293 self._delete(result)294 self._insert(data["order_book"])295 elif type_ == "delta":296 self._delete(data["delete"])297 self._update(data["update"])298 self._insert(data["insert"])299class TradeInverse(DataStore):300 _KEYS = ["trade_id"]301 _MAXLEN = 99999302 def _onmessage(self, data: list[Item]) -> None:303 self._insert(data)304class TradeUSDT(TradeInverse):305 ...306class Insurance(DataStore):307 _KEYS = ["currency"]308 def _onmessage(self, data: list[Item]) -> None:309 self._update(data)310class InstrumentInverse(DataStore):311 _KEYS = ["symbol"]312 def _onmessage(self, topic: str, type_: str, data: Item) -> None:313 if type_ == "snapshot":314 symbol = topic.split(".")[-1] # ex: "instrument_info.100ms.BTCUSD"315 result = self.find({"symbol": symbol})316 self._delete(result)317 self._insert([data])318 elif type_ == "delta":319 self._update(data["update"])320class InstrumentUSDT(InstrumentInverse):321 ...322class KlineInverse(DataStore):323 _KEYS = ["start", "symbol", "interval"]324 def _onmessage(self, topic: str, data: list[Item]) -> None:325 topic_split = topic.split(".") # ex:"klineV2.1.BTCUSD"326 for item in data:327 item["symbol"] = topic_split[-1]328 item["interval"] = topic_split[-2]329 self._update(data)330 def _onresponse(self, data: list[Item]) -> None:331 for item in data:332 item["start"] = item.pop("open_time")333 self._update(data)334class KlineUSDT(KlineInverse):335 ...336class LiquidationInverse(DataStore):337 _MAXLEN = 99999338 def _onmessage(self, item: Item) -> None:339 self._insert([item])340class LiquidationUSDT(LiquidationInverse):341 ...342class PositionInverse(DataStore):343 _KEYS = ["symbol", "position_idx"]344 def one(self, symbol: str) -> Optional[Item]:345 return self.get({"symbol": symbol, "position_idx": 0})346 def both(self, symbol: str) -> dict[str, Optional[Item]]:347 return {348 "Sell": self.get({"symbol": symbol, "position_idx": 2}),349 "Buy": self.get({"symbol": symbol, "position_idx": 1}),350 }351 def _onresponse(self, data: Union[Item, list[Item]]) -> None:352 if isinstance(data, dict):353 self._update([data]) # ex: {"symbol": "BTCUSD", ...}354 elif isinstance(data, list):355 for item in data:356 if "is_valid" in item:357 if item["is_valid"]:358 self._update([item["data"]])359 # ex:360 # [361 # {362 # "is_valid": True,363 # "data": {"symbol": "BTCUSDM21", ...}364 # },365 # ...366 # ]367 else:368 self._update([item])369 # ex: [{"symbol": "BTCUSDT", ...}, ...]370 def _onmessage(self, data: list[Item]) -> None:371 self._update(data)372class PositionUSDT(PositionInverse):373 def _onmessage(self, data: list[Item]) -> None:374 for item in data:375 item["position_idx"] = int(item["position_idx"])376 self._update([item])377class ExecutionInverse(DataStore):378 _KEYS = ["exec_id"]379 def _onmessage(self, data: list[Item]) -> None:380 self._update(data)381class ExecutionUSDT(ExecutionInverse):382 ...383class OrderInverse(DataStore):384 _KEYS = ["order_id"]385 def _onresponse(self, data: list[Item]) -> None:386 if isinstance(data, list):387 self._update(data)388 elif isinstance(data, dict):389 self._update([data])390 def _onmessage(self, data: list[Item]) -> None:391 for item in data:392 if item["order_status"] in ("Created", "New", "PartiallyFilled"):393 self._update([item])394 else:395 self._delete([item])396class OrderUSDT(OrderInverse):397 ...398class StopOrderInverse(DataStore):399 _KEYS = ["order_id"]400 def _onresponse(self, data: list[Item]) -> None:401 if isinstance(data, list):402 self._update(data)403 elif isinstance(data, dict):404 self._update([data])405 def _onmessage(self, data: list[Item]) -> None:406 for item in data:407 if item["order_status"] in ("Active", "Untriggered"):408 self._update([item])409 else:410 self._delete([item])411class StopOrderUSDT(StopOrderInverse):412 _KEYS = ["stop_order_id"]413class WalletInverse(DataStore):414 _KEYS = ["coin"]415 def _onresponse(self, data: dict[str, Item]) -> None:416 data.pop("USDT", None)417 for coin in data:418 self._update(419 [420 {421 "coin": coin,422 "available_balance": data[coin]["available_balance"],423 "wallet_balance": data[coin]["wallet_balance"],424 }425 ]426 )427 def _onmessage(self, data: list[Item]) -> None:428 self._update(data)429class WalletUSDT(WalletInverse):430 def _onresponse(self, data: dict[str, Item]) -> None:431 if "USDT" in data:432 self._update(433 [434 {435 "coin": "USDT",436 "wallet_balance": data["USDT"]["wallet_balance"],437 "available_balance": data["USDT"]["available_balance"],438 }439 ]440 )441 def _onmessage(self, data: list[Item]) -> None:442 for item in data:...

Full Screen

Full Screen

test_claims.py

Source:test_claims.py Github

copy

Full Screen

1# Copyright (c) 2015 Red Hat, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may not4# use this file except in compliance with the License. You may obtain a copy5# of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations under13# the License.14import json15import ddt16import mock17from oslo_utils import timeutils18from oslo_utils import uuidutils19from zaqar.common import consts20from zaqar.tests.unit.transport.websocket import base21from zaqar.tests.unit.transport.websocket import utils as test_utils22@ddt.ddt23class ClaimsBaseTest(base.V1_1Base):24 config_file = "websocket_mongodb.conf"25 def setUp(self):26 super(ClaimsBaseTest, self).setUp()27 self.protocol = self.transport.factory()28 self.defaults = self.api.get_defaults()29 self.project_id = '7e55e1a7e'30 self.headers = {31 'Client-ID': uuidutils.generate_uuid(),32 'X-Project-ID': self.project_id33 }34 action = consts.QUEUE_CREATE35 body = {"queue_name": "skittle"}36 req = test_utils.create_request(action, body, self.headers)37 with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:38 self.protocol.onMessage(req, False)39 resp = json.loads(msg_mock.call_args[0][0].decode())40 self.assertIn(resp['headers']['status'], [201, 204])41 action = consts.MESSAGE_POST42 body = {"queue_name": "skittle",43 "messages": [44 {'body': 239, 'ttl': 300},45 {'body': {'key_1': 'value_1'}, 'ttl': 300},46 {'body': [1, 3], 'ttl': 300},47 {'body': 439, 'ttl': 300},48 {'body': {'key_2': 'value_2'}, 'ttl': 300},49 {'body': ['a', 'b'], 'ttl': 300},50 {'body': 639, 'ttl': 300},51 {'body': {'key_3': 'value_3'}, 'ttl': 300},52 {'body': ["aa", "bb"], 'ttl': 300}]53 }54 send_mock = mock.Mock()55 self.protocol.sendMessage = send_mock56 req = test_utils.create_request(action, body, self.headers)57 self.protocol.onMessage(req, False)58 resp = json.loads(send_mock.call_args[0][0].decode())59 self.assertEqual(201, resp['headers']['status'])60 def tearDown(self):61 super(ClaimsBaseTest, self).tearDown()62 action = consts.QUEUE_DELETE63 body = {'queue_name': 'skittle'}64 send_mock = mock.Mock()65 self.protocol.sendMessage = send_mock66 req = test_utils.create_request(action, body, self.headers)67 self.protocol.onMessage(req, False)68 resp = json.loads(send_mock.call_args[0][0].decode())69 self.assertEqual(204, resp['headers']['status'])70 @ddt.data('[', '[]', '.', '"fail"')71 def test_bad_claim(self, doc):72 action = consts.CLAIM_CREATE73 body = doc74 send_mock = mock.Mock()75 self.protocol.sendMessage = send_mock76 req = test_utils.create_request(action, body, self.headers)77 self.protocol.onMessage(req, False)78 resp = json.loads(send_mock.call_args[0][0].decode())79 self.assertEqual(400, resp['headers']['status'])80 action = consts.CLAIM_UPDATE81 body = doc82 req = test_utils.create_request(action, body, self.headers)83 self.protocol.onMessage(req, False)84 resp = json.loads(send_mock.call_args[0][0].decode())85 self.assertEqual(400, resp['headers']['status'])86 def test_exceeded_claim(self):87 action = consts.CLAIM_CREATE88 body = {"queue_name": "skittle",89 "ttl": 100,90 "grace": 60,91 "limit": 21}92 send_mock = mock.Mock()93 self.protocol.sendMessage = send_mock94 req = test_utils.create_request(action, body, self.headers)95 self.protocol.onMessage(req, False)96 resp = json.loads(send_mock.call_args[0][0].decode())97 self.assertEqual(400, resp['headers']['status'])98 @ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))99 def test_unacceptable_ttl_or_grace(self, ttl_grace):100 ttl, grace = ttl_grace101 action = consts.CLAIM_CREATE102 body = {"queue_name": "skittle",103 "ttl": ttl,104 "grace": grace}105 send_mock = mock.Mock()106 self.protocol.sendMessage = send_mock107 req = test_utils.create_request(action, body, self.headers)108 self.protocol.onMessage(req, False)109 resp = json.loads(send_mock.call_args[0][0].decode())110 self.assertEqual(400, resp['headers']['status'])111 @ddt.data(-1, 59, 43201)112 def test_unacceptable_new_ttl(self, ttl):113 claim = self._get_a_claim()114 action = consts.CLAIM_UPDATE115 body = {"queue_name": "skittle",116 "claim_id": claim['body']['claim_id'],117 "ttl": ttl}118 send_mock = mock.Mock()119 self.protocol.sendMessage = send_mock120 req = test_utils.create_request(action, body, self.headers)121 self.protocol.onMessage(req, False)122 resp = json.loads(send_mock.call_args[0][0].decode())123 self.assertEqual(400, resp['headers']['status'])124 def test_default_ttl_and_grace(self):125 action = consts.CLAIM_CREATE126 body = {"queue_name": "skittle"}127 send_mock = mock.Mock()128 self.protocol.sendMessage = send_mock129 req = test_utils.create_request(action, body, self.headers)130 self.protocol.onMessage(req, False)131 resp = json.loads(send_mock.call_args[0][0].decode())132 self.assertEqual(201, resp['headers']['status'])133 action = consts.CLAIM_GET134 body = {"queue_name": "skittle",135 "claim_id": resp['body']['claim_id']}136 req = test_utils.create_request(action, body, self.headers)137 self.protocol.onMessage(req, False)138 resp = json.loads(send_mock.call_args[0][0].decode())139 self.assertEqual(200, resp['headers']['status'])140 self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])141 def test_lifecycle(self):142 # First, claim some messages143 action = consts.CLAIM_CREATE144 body = {"queue_name": "skittle",145 "ttl": 100,146 "grace": 60}147 send_mock = mock.Mock()148 self.protocol.sendMessage = send_mock149 req = test_utils.create_request(action, body, self.headers)150 self.protocol.onMessage(req, False)151 resp = json.loads(send_mock.call_args[0][0].decode())152 self.assertEqual(201, resp['headers']['status'])153 claimed_messages = resp['body']['messages']154 claim_id = resp['body']['claim_id']155 # No more messages to claim156 body = {"queue_name": "skittle",157 "ttl": 100,158 "grace": 60}159 req = test_utils.create_request(action, body, self.headers)160 self.protocol.onMessage(req, False)161 resp = json.loads(send_mock.call_args[0][0].decode())162 self.assertEqual(204, resp['headers']['status'])163 # Listing messages, by default, won't include claimed, will echo164 action = consts.MESSAGE_LIST165 body = {"queue_name": "skittle",166 "echo": True}167 req = test_utils.create_request(action, body, self.headers)168 self.protocol.onMessage(req, False)169 resp = json.loads(send_mock.call_args[0][0].decode())170 self.assertEqual(200, resp['headers']['status'])171 self.assertEqual([], resp['body']['messages'])172 # Listing messages, by default, won't include claimed, won't echo173 body = {"queue_name": "skittle",174 "echo": False}175 req = test_utils.create_request(action, body, self.headers)176 self.protocol.onMessage(req, False)177 resp = json.loads(send_mock.call_args[0][0].decode())178 self.assertEqual(200, resp['headers']['status'])179 self.assertEqual([], resp['body']['messages'])180 # List messages, include_claimed, but don't echo181 body = {"queue_name": "skittle",182 "include_claimed": True,183 "echo": False}184 req = test_utils.create_request(action, body, self.headers)185 self.protocol.onMessage(req, False)186 resp = json.loads(send_mock.call_args[0][0].decode())187 self.assertEqual(200, resp['headers']['status'])188 self.assertEqual(resp['body']['messages'], [])189 # List messages with a different client-id and echo=false.190 # Should return some messages191 body = {"queue_name": "skittle",192 "echo": False}193 headers = {194 'Client-ID': uuidutils.generate_uuid(),195 'X-Project-ID': self.project_id196 }197 req = test_utils.create_request(action, body, headers)198 self.protocol.onMessage(req, False)199 resp = json.loads(send_mock.call_args[0][0].decode())200 self.assertEqual(200, resp['headers']['status'])201 # Include claimed messages this time, and echo202 body = {"queue_name": "skittle",203 "include_claimed": True,204 "echo": True}205 req = test_utils.create_request(action, body, self.headers)206 self.protocol.onMessage(req, False)207 resp = json.loads(send_mock.call_args[0][0].decode())208 self.assertEqual(200, resp['headers']['status'])209 self.assertEqual(len(claimed_messages), len(resp['body']['messages']))210 message_id_1 = resp['body']['messages'][0]['id']211 message_id_2 = resp['body']['messages'][1]['id']212 # Try to delete the message without submitting a claim_id213 action = consts.MESSAGE_DELETE214 body = {"queue_name": "skittle",215 "message_id": message_id_1}216 req = test_utils.create_request(action, body, self.headers)217 self.protocol.onMessage(req, False)218 resp = json.loads(send_mock.call_args[0][0].decode())219 self.assertEqual(403, resp['headers']['status'])220 # Delete the message and its associated claim221 body = {"queue_name": "skittle",222 "message_id": message_id_1,223 "claim_id": claim_id}224 req = test_utils.create_request(action, body, self.headers)225 self.protocol.onMessage(req, False)226 resp = json.loads(send_mock.call_args[0][0].decode())227 self.assertEqual(204, resp['headers']['status'])228 # Try to get it from the wrong project229 headers = {230 'Client-ID': uuidutils.generate_uuid(),231 'X-Project-ID': 'someproject'232 }233 action = consts.MESSAGE_GET234 body = {"queue_name": "skittle",235 "message_id": message_id_2}236 req = test_utils.create_request(action, body, headers)237 self.protocol.onMessage(req, False)238 resp = json.loads(send_mock.call_args[0][0].decode())239 self.assertEqual(404, resp['headers']['status'])240 # Get the message241 action = consts.MESSAGE_GET242 body = {"queue_name": "skittle",243 "message_id": message_id_2}244 req = test_utils.create_request(action, body, self.headers)245 self.protocol.onMessage(req, False)246 resp = json.loads(send_mock.call_args[0][0].decode())247 self.assertEqual(200, resp['headers']['status'])248 # Update the claim249 creation = timeutils.utcnow()250 action = consts.CLAIM_UPDATE251 body = {"queue_name": "skittle",252 "ttl": 60,253 "grace": 60,254 "claim_id": claim_id}255 req = test_utils.create_request(action, body, self.headers)256 self.protocol.onMessage(req, False)257 resp = json.loads(send_mock.call_args[0][0].decode())258 self.assertEqual(204, resp['headers']['status'])259 # Get the claimed messages (again)260 action = consts.CLAIM_GET261 body = {"queue_name": "skittle",262 "claim_id": claim_id}263 req = test_utils.create_request(action, body, self.headers)264 self.protocol.onMessage(req, False)265 query = timeutils.utcnow()266 resp = json.loads(send_mock.call_args[0][0].decode())267 self.assertEqual(200, resp['headers']['status'])268 self.assertEqual(60, resp['body']['ttl'])269 message_id_3 = resp['body']['messages'][0]['id']270 estimated_age = timeutils.delta_seconds(creation, query)271 # The claim's age should be 0 at this moment. But in some unexpected272 # case, such as slow test, the age maybe larger than 0. Just skip273 # asserting if so.274 if resp['body']['age'] == 0:275 self.assertGreater(estimated_age, resp['body']['age'])276 # Delete the claim277 action = consts.CLAIM_DELETE278 body = {"queue_name": "skittle",279 "claim_id": claim_id}280 req = test_utils.create_request(action, body, self.headers)281 self.protocol.onMessage(req, False)282 resp = json.loads(send_mock.call_args[0][0].decode())283 self.assertEqual(204, resp['headers']['status'])284 # Try to delete a message with an invalid claim ID285 action = consts.MESSAGE_DELETE286 body = {"queue_name": "skittle",287 "message_id": message_id_3,288 "claim_id": claim_id}289 req = test_utils.create_request(action, body, self.headers)290 self.protocol.onMessage(req, False)291 resp = json.loads(send_mock.call_args[0][0].decode())292 self.assertEqual(400, resp['headers']['status'])293 # Make sure it wasn't deleted!294 action = consts.MESSAGE_GET295 body = {"queue_name": "skittle",296 "message_id": message_id_2}297 req = test_utils.create_request(action, body, self.headers)298 self.protocol.onMessage(req, False)299 resp = json.loads(send_mock.call_args[0][0].decode())300 self.assertEqual(200, resp['headers']['status'])301 # Try to get a claim that doesn't exist302 action = consts.CLAIM_GET303 body = {"queue_name": "skittle",304 "claim_id": claim_id}305 req = test_utils.create_request(action, body, self.headers)306 self.protocol.onMessage(req, False)307 resp = json.loads(send_mock.call_args[0][0].decode())308 self.assertEqual(404, resp['headers']['status'])309 # Try to update a claim that doesn't exist310 action = consts.CLAIM_UPDATE311 body = {"queue_name": "skittle",312 "ttl": 60,313 "grace": 60,314 "claim_id": claim_id}315 req = test_utils.create_request(action, body, self.headers)316 self.protocol.onMessage(req, False)317 resp = json.loads(send_mock.call_args[0][0].decode())318 self.assertEqual(404, resp['headers']['status'])319 def test_post_claim_nonexistent_queue(self):320 action = consts.CLAIM_CREATE321 body = {"queue_name": "nonexistent",322 "ttl": 100,323 "grace": 60}324 send_mock = mock.Mock()325 self.protocol.sendMessage = send_mock326 req = test_utils.create_request(action, body, self.headers)327 self.protocol.onMessage(req, False)328 resp = json.loads(send_mock.call_args[0][0].decode())329 self.assertEqual(204, resp['headers']['status'])330 def test_get_claim_nonexistent_queue(self):331 action = consts.CLAIM_GET332 body = {"queue_name": "nonexistent",333 "claim_id": "aaabbbba"}334 send_mock = mock.Mock()335 self.protocol.sendMessage = send_mock336 req = test_utils.create_request(action, body, self.headers)337 self.protocol.onMessage(req, False)338 resp = json.loads(send_mock.call_args[0][0].decode())339 self.assertEqual(404, resp['headers']['status'])340 def _get_a_claim(self):341 action = consts.CLAIM_CREATE342 body = {"queue_name": "skittle",343 "ttl": 100,344 "grace": 60}345 send_mock = mock.Mock()346 self.protocol.sendMessage = send_mock347 req = test_utils.create_request(action, body, self.headers)348 self.protocol.onMessage(req, False)349 resp = json.loads(send_mock.call_args[0][0].decode())350 self.assertEqual(201, resp['headers']['status'])...

Full Screen

Full Screen

store.py

Source:store.py Github

copy

Full Screen

...46 self.position.linear._onresponse(content['result'])47 # wallet48 elif resp.request.path_url.startswith('/v2/private/wallet/balance'):49 self.wallet._onresponse(content['result'])50 def onmessage(self, msg: str, ws: WebSocket) -> None:51 content: Dict[str, Any] = json.loads(msg)52 if 'topic' in content:53 topic: str = content['topic']54 data: Union[List[Item], Item] = content['data']55 type_: Optional[str] = content.get('type')56 if any([57 topic.startswith('orderBookL2_25'),58 topic.startswith('orderBook_200'),59 ]):60 self.orderbook._onmessage(type_, data)61 elif topic.startswith('trade'):62 self.trade._onmessage(data)63 elif topic.startswith('insurance'):64 self.insurance._onmessage(data)65 elif topic.startswith('instrument_info'):66 self.instrument._onmessage(type_, data)67 if any([68 topic.startswith('klineV2'),69 topic.startswith('candle'),70 ]):71 self.kline._onmessage(topic, data)72 elif topic == 'position':73 self.position._onmessage(data)74 self.wallet._onposition(data)75 elif topic == 'execution':76 self.execution._onmessage(data)77 elif topic == 'order':78 self.order._onmessage(data)79 elif topic == 'stop_order':80 self.stoporder._onmessage(data)81 elif topic == 'wallet':82 self.wallet._onmessage(data)83 for event in self._events:84 event.set()85 self._events.clear()86 def wait(self) -> None:87 event = Event()88 self._events.append(event)89 event.wait()90class DefaultDataStore(DataStore): ...91Item = Dict[str, Any]92class _KeyValueStore:93 _KEYS: List[str]94 _MAXLEN: Optional[int]95 def __init__(self) -> None:96 self._data: Dict[str, Item] = {}97 self._events: List[Event] = []98 99 def get(self, **kwargs) -> Optional[Item]:100 try:101 dumps = self._dumps(kwargs)102 if dumps in self._data:103 return self._data[dumps]104 except KeyError:105 if kwargs:106 for item in self._data.values():107 for k, v, in kwargs.items():108 if not k in item:109 break110 if v != item[k]:111 break112 else:113 return item114 else:115 for item in self._data.values():116 return item117 def getlist(self, **kwargs) -> List[Item]:118 if kwargs:119 result = []120 for item in self._data.values():121 for k, v in kwargs.items():122 if not k in item:123 break124 if v != item[k]:125 break126 else:127 result.append(item)128 return result129 else:130 return list(self._data.values())131 def __len__(self):132 return len(self._data)133 def _dumps(self, item: Item) -> str:134 keyitem = {k: item[k] for k in self._KEYS}135 return urllib.parse.urlencode(keyitem)136 137 def _update(self, items: List[Item]) -> None:138 for item in items:139 try:140 key = self._dumps(item)141 if key in self._data:142 self._data[key].update(item)143 else:144 self._data[key] = item145 except KeyError:146 pass147 if self._MAXLEN is not None:148 len_data = len(self._data)149 if len_data > self._MAXLEN:150 over = len_data - self._MAXLEN151 keys = []152 for i, k in enumerate(self._data.keys()):153 if i < over:154 keys.append(k)155 else:156 break157 for k in keys:158 self._data.pop(k)159 for event in self._events:160 event.set()161 self._events.clear()162 def _pop(self, items: List[Item]) -> None:163 for item in items:164 try:165 key = self._dumps(item)166 if key in self._data:167 self._data.pop(key)168 except KeyError:169 pass170 for event in self._events:171 event.set()172 self._events.clear()173 def wait(self) -> None:174 event = Event()175 self._events.append(event)176 event.wait()177class OrderBook(_KeyValueStore):178 _KEYS = ['symbol', 'id', 'side']179 _MAXLEN = None180 def getbest(self, symbol: str) -> Dict[str, Optional[Item]]:181 result = {'Sell': {}, 'Buy': {}}182 for item in self._data.values():183 if item['symbol'] == symbol:184 result[item['side']][float(item['price'])] = item185 return {186 'Sell': result['Sell'][min(result['Sell'])] if result['Sell'] else None,187 'Buy': result['Buy'][max(result['Buy'])] if result['Buy'] else None188 }189 def getsorted(self, symbol: str) -> Dict[str, List[Item]]:190 result = {'Sell': [], 'Buy': []}191 for item in self._data.values():192 if item['symbol'] == symbol:193 result[item['side']].append(item)194 return {195 'Sell': sorted(result['Sell'], key=lambda x: float(x['price'])),196 'Buy': sorted(result['Buy'], key=lambda x: float(x['price']), reverse=True)197 }198 def _onmessage(self, type_: str, data: Union[List[Item], Item]) -> None:199 if type_ == 'snapshot':200 if isinstance(data, dict):201 data = data['order_book']202 self._update(data)203 elif type_ == 'delta':204 self._pop(data['delete'])205 self._update(data['update'])206 self._update(data['insert'])207class Trade(_KeyValueStore):208 _KEYS = ['trade_id']209 _MAXLEN = 10000210 def _onmessage(self, data: List[Item]) -> None:211 self._update(data)212class Insurance(_KeyValueStore):213 _KEYS = ['currency']214 _MAXLEN = None215 def _onmessage(self, data: List[Item]) -> None:216 self._update(data)217class Instrument(_KeyValueStore):218 _KEYS = ['symbol']219 _MAXLEN = None220 def _onmessage(self, type_: str, data: Item) -> None:221 if type_ == 'snapshot':222 self._update([data])223 elif type_ == 'delta':224 self._update(data['update'])225class Kline(_KeyValueStore):226 _KEYS = ['symbol', 'start']227 _MAXLEN = 5000228 def _onmessage(self, topic: str, data: List[Item]) -> None:229 symbol = topic.split('.')[2] # ex:'klineV2.1.BTCUSD'230 for item in data:231 item['symbol'] = symbol232 self._update(data)233class Position:234 def __init__(self):235 self.inverse = PositionInverse()236 self.linear = PositionLinear()237 238 def _onmessage(self, data: List[Item]) -> None:239 if len(data):240 symbol: str = data[0]['symbol']241 if symbol.endswith('USDT'):242 self.linear._onmessage(data)243 else:244 self.inverse._onmessage(data)245class PositionInverse(_KeyValueStore):246 _KEYS = ['symbol', 'position_idx']247 _MAXLEN = None248 249 def getone(self, symbol: str) -> Optional[Item]:250 return self.get(symbol=symbol, position_idx=0)251 def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:252 return {253 'Sell': self.get(symbol=symbol, position_idx=2),254 'Buy': self.get(symbol=symbol, position_idx=1),255 }256 def _onresponse(self, data: Union[Item, List[Item]]) -> None:257 if isinstance(data, dict):258 self._update([data])259 elif isinstance(data, list):260 if len(data) and 'data' in data[0]:261 self._update([item['data'] for item in data])262 else:263 self._update(data)264 def _onmessage(self, data: List[Item]) -> None:265 self._update(data)266class PositionLinear(_KeyValueStore):267 _KEYS = ['symbol', 'side']268 _MAXLEN = None269 def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:270 return {271 'Sell': self.get(symbol=symbol, side='Sell'),272 'Buy': self.get(symbol=symbol, side='Buy'),273 }274 def _onresponse(self, data: List[Item]) -> None:275 if len(data) and 'data' in data[0]:276 self._update([item['data'] for item in data])277 else:278 self._update(data)279 def _onmessage(self, data: List[Item]) -> None:280 self._update(data)281class Execution(_KeyValueStore):282 _KEYS = ['exec_id']283 _MAXLEN = 5000284 def _onmessage(self, data: List[Item]) -> None:285 self._update(data)286class Order(_KeyValueStore):287 _KEYS = ['order_id']288 _MAXLEN = None289 def _onresponse(self, data: List[Item]) -> None:290 self._update(data)291 def _onmessage(self, data: List[Item]) -> None:292 for item in data:293 if item['order_status'] in ('Created', 'New', 'PartiallyFilled', ):294 self._update([item])295 else:296 self._pop([item])297class StopOrder(_KeyValueStore):298 _KEYS = ['stop_order_id']299 _MAXLEN = None300 def _onresponse(self, data: List[Item]) -> None:301 self._update(data)302 def _onmessage(self, data: List[Item]) -> None:303 for item in data:304 if 'order_id' in item:305 item['stop_order_id'] = item.pop('order_id')306 if 'order_status' in item:307 item['stop_order_status'] = item.pop('order_status')308 if item['stop_order_status'] in ('Active', 'Untriggered', ):309 self._update([item])310 else:311 self._pop([item])312class Wallet(_KeyValueStore):313 _KEYS = ['coin']314 _MAXLEN = None315 def _onresponse(self, data: Dict[str, Item]) -> None:316 for coin, item in data.items():317 _item = {}318 _item['coin'] = coin319 _item['wallet_balance'] = item['wallet_balance']320 _item['available_balance'] = item['available_balance']321 self._update([_item])322 def _onposition(self, data: List[Item]) -> None:323 if len(data) and 'position_idx' in data[0]:324 for item in data:325 _item = {}326 symbol: str = item['symbol']327 if symbol.endswith('USD'):328 _item['coin'] = symbol[:-3] # ex:'BTCUSD'329 else:330 _item['coin'] = symbol[:-6] # ex:'BTCUSDM21'331 _item['wallet_balance'] = item['wallet_balance']332 _item['available_balance'] = item['available_balance']333 self._update([_item])334 def _onmessage(self, data: List[Item]) -> None:335 for item in data:336 _item = {}337 _item['coin'] = 'USDT'338 _item['wallet_balance'] = item['wallet_balance']339 _item['available_balance'] = item['available_balance']...

Full Screen

Full Screen

binance.py

Source:binance.py Github

copy

Full Screen

...56 self.order._onresponse(symbol, data)57 elif resp.url.path in ("/fapi/v1/listenKey",):58 self.listenkey = data["listenKey"]59 asyncio.create_task(self._listenkey(resp.__dict__["_raw_session"]))60 def _onmessage(self, msg: Any, ws: ClientWebSocketResponse) -> None:61 if "error" in msg:62 logger.warning(msg)63 if "result" not in msg:64 data = msg["data"] if "data" in msg else msg65 event = data["e"] if isinstance(data, dict) else data[0]["e"]66 if event in ("trade", "aggTrade"):67 self.trade._onmessage(data)68 elif event == "markPriceUpdate":69 self.markprice._onmessage(data)70 elif event == "kline":71 self.kline._onmessage(data)72 elif event == "continuous_kline":73 self.continuouskline._onmessage(data)74 elif event in ("24hrMiniTicker", "24hrTicker"):75 self.ticker._onmessage(data)76 elif event == "bookTicker":77 self.bookticker._onmessage(data)78 elif event == "forceOrder":79 self.liquidation._onmessage(data)80 elif event == "depthUpdate":81 self.orderbook._onmessage(data)82 elif event == "ACCOUNT_UPDATE":83 self.balance._onmessage(data)84 self.position._onmessage(data)85 elif event == "ORDER_TRADE_UPDATE":86 self.order._onmessage(data)87 @staticmethod88 async def _listenkey(session: aiohttp.ClientSession):89 while not session.closed:90 await session.put("https://fapi.binance.com/fapi/v1/listenKey", auth=Auth)91 await asyncio.sleep(1800.0) # 30 minutes92 @property93 def trade(self) -> "Trade":94 return self.get("trade", Trade)95 @property96 def markprice(self) -> "MarkPrice":97 return self.get("markprice", MarkPrice)98 @property99 def kline(self) -> "Kline":100 return self.get("kline", Kline)101 @property102 def continuouskline(self) -> "ContinuousKline":103 return self.get("continuouskline", ContinuousKline)104 @property105 def ticker(self) -> "Ticker":106 return self.get("ticker", Ticker)107 @property108 def bookticker(self) -> "BookTicker":109 return self.get("bookticker", BookTicker)110 @property111 def liquidation(self) -> "Liquidation":112 return self.get("liquidation", Liquidation)113 @property114 def orderbook(self) -> "OrderBook":115 return self.get("orderbook", OrderBook)116 @property117 def balance(self) -> "Balance":118 return self.get("balance", Balance)119 @property120 def position(self) -> "Position":121 return self.get("position", Position)122 @property123 def order(self) -> "Order":124 """125 アクティブオーダーのみ(約定・キャンセル済みは削除される)126 """127 return self.get("order", Order)128class Trade(DataStore):129 _MAXLEN = 99999130 def _onmessage(self, item: Item) -> None:131 self._insert([item])132class MarkPrice(DataStore):133 _KEYS = ["s"]134 def _onmessage(self, data: Union[Item, list[Item]]) -> None:135 if isinstance(data, list):136 self._update(data)137 else:138 self._update([data])139class Kline(DataStore):140 _KEYS = ["t", "s", "i"]141 def _onmessage(self, item: Item) -> None:142 self._update([item["k"]])143class ContinuousKline(DataStore):144 _KEYS = ["ps", "ct", "t", "i"]145 def _onmessage(self, item: Item) -> None:146 self._update([{"ps": item["ps"], "ct": item["ct"], **item["k"]}])147class Ticker(DataStore):148 _KEYS = ["s"]149 def _onmessage(self, data: Union[Item, list[Item]]) -> None:150 if isinstance(data, list):151 self._update(data)152 else:153 self._update([data])154class BookTicker(DataStore):155 _KEYS = ["s"]156 def _onmessage(self, item: Item) -> None:157 self._update([item])158class Liquidation(DataStore):159 def _onmessage(self, item: Item) -> None:160 self._insert([item["o"]])161class OrderBook(DataStore):162 _KEYS = ["s", "S", "p"]163 _MAPSIDE = {"BUY": "b", "SELL": "a"}164 def _init(self) -> None:165 self.initialized = False166 self._buff = deque(maxlen=200)167 def sorted(self, query: Optional[Item] = None) -> dict[str, list[float]]:168 if query is None:169 query = {}170 result = {self._MAPSIDE[k]: [] for k in self._MAPSIDE}171 for item in self:172 if all(k in item and query[k] == item[k] for k in query):173 result[self._MAPSIDE[item["S"]]].append([item["p"], item["q"]])174 result["b"].sort(key=lambda x: float(x[0]), reverse=True)175 result["a"].sort(key=lambda x: float(x[0]))176 return result177 def _onmessage(self, item: Item) -> None:178 if not self.initialized:179 self._buff.append(item)180 for s, bs in self._MAPSIDE.items():181 for row in item[bs]:182 if float(row[1]) != 0.0:183 self._update([{"s": item["s"], "S": s, "p": row[0], "q": row[1]}])184 else:185 self._delete([{"s": item["s"], "S": s, "p": row[0]}])186 def _onresponse(self, symbol: str, item: Item) -> None:187 self.initialized = True188 self._delete(self.find({"s": symbol}))189 for s, bs in (("BUY", "bids"), ("SELL", "asks")):190 for row in item[bs]:191 self._insert([{"s": symbol, "S": s, "p": row[0], "q": row[1]}])192 for msg in self._buff:193 if msg["U"] <= item["lastUpdateId"] and msg["u"] >= item["lastUpdateId"]:194 self._onmessage(msg)195 self._buff.clear()196class Balance(DataStore):197 _KEYS = ["a"]198 def _onmessage(self, item: Item) -> None:199 self._update(item["a"]["B"])200 def _onresponse(self, data: list[Item]) -> None:201 for item in data:202 self._update(203 [204 {205 "a": item["asset"],206 "wb": item["balance"],207 "cw": item["crossWalletBalance"],208 }209 ]210 )211class Position(DataStore):212 _KEYS = ["s", "ps"]213 def _onmessage(self, item: Item) -> None:214 self._update(item["a"]["P"])215 def _onresponse(self, data: list[Item]) -> None:216 for item in data:217 self._update(218 [219 {220 "s": item["symbol"],221 "pa": item["positionAmt"],222 "ep": item["entryPrice"],223 "mt": item["marginType"],224 "iw": item["isolatedWallet"],225 "ps": item["positionSide"],226 }227 ]228 )229class Order(DataStore):230 _KEYS = ["s", "i"]231 def _onmessage(self, item: Item) -> None:232 if item["o"]["X"] not in ("FILLED", "CANCELED", "EXPIRED"):233 self._update([item["o"]])234 else:235 self._delete([item["o"]])236 def _onresponse(self, symbol: Optional[str], data: list[Item]) -> None:237 if symbol is not None:238 self._delete(self.find({"symbol": symbol}))239 else:240 self._clear()241 for item in data:242 self._insert(243 [244 {245 "s": item["symbol"],...

Full Screen

Full Screen

vngateio_ws.py

Source:vngateio_ws.py Github

copy

Full Screen

1# encoding: utf-82import json3from time import time, sleep4from threading import Thread5from datetime import datetime6import base647import hmac8import hashlib9import json10import gzip, binascii, os 11import urllib , requests12import websocket 13import time14from vnpy.trader.vtFunction import systemSymbolToVnSymbol , VnSymbolToSystemSymbol15import json16GATEIO_SOCKET_URL = "wss://ws.gate.io/v3"17'''18'''19class Gate_WSDataApi(object):20 """基于Websocket的API对象"""21 #----------------------------------------------------------------------22 def __init__(self):23 """Constructor"""24 self.host = '' # 服务器地址25 self.apiKey = '' # 用户名26 self.secretKey = '' # 密码27 28 self.ws = None # websocket应用对象29 self.thread = None # 工作线程30 self.subscribeStrList = set([])31 #----------------------------------------------------------------------32 def reconnect(self):33 """重新连接"""34 # 首先关闭之前的连接35 #self.close()36 37 # 再执行重连任务38 self.ws = websocket.WebSocketApp(self.host, 39 on_message=self.onMessage,40 on_error=self.onError,41 on_close=self.onClose,42 on_open=self.onOpen) 43 44 self.thread = Thread(target=self.ws.run_forever , args = (None , None , 60, 30))45 self.thread.start()46 47 #----------------------------------------------------------------------48 def connect_Subpot(self, apiKey , secretKey , trace = False):49 self.host = GATEIO_SOCKET_URL50 self.apiKey = apiKey51 self.secretKey = secretKey52 self.trace = trace53 websocket.enableTrace(trace)54 self.ws = websocket.WebSocketApp(self.host, 55 on_message=self.onMessage,56 on_error=self.onError,57 on_close=self.onClose,58 on_open=self.onOpen) 59 60 self.thread = Thread(target = self.ws.run_forever , args = (None , None , 60, 30))61 # self.thread_heart = Thread(target = self.run_forever_heart)62 self.thread.start()63 # self.thread_heart.start()64 #----------------------------------------------------------------------65 def onMessage(self, ws, evt):66 """67 信息推送事件68 :param ws: 接口69 :param evt: 事件70 :return:71 """72 print(u'vngate_nwe.onMessage:{}'.format(evt))73 74 #----------------------------------------------------------------------75 def onError(self, ws, evt):76 """77 接口错误推送事件78 :param ws:79 :param evt:80 :return:81 """82 print(u'vngate_nwe.onApiError:{}'.format(evt))83 84 #----------------------------------------------------------------------85 def onClose(self, ws):86 """87 接口断开事件88 :param ws:89 :return:90 """91 print(u'vngate_nwe.onClose')92 93 #----------------------------------------------------------------------94 def onOpen(self, ws):95 """96 接口打开事件97 :param ws:98 :return:99 """100 print(u'vngate_nwe.onOpen')101 #----------------------------------------------------------------------102 def sendSocketCmd( self, client_id , method , json_params = []):103 send_json = {104 "id": client_id,105 "method": method,106 "params": json_params107 }108 self.ws.send( json.dumps(send_json))109 '''110 vngate_nwe.onMessage:{"error": null, "result": {"period": 86400, "open": "18.1604", "close": "17.03"111, "high": "18.53", "low": "16.54", "last": "17.03", "change": "-6.22", "quoteVolume": "1015826.9981111289865", "baseVolume": "17910280.42194529534205249261"}, "id": 1}113 '''114 #----------------------------------------------------------------------115 def querySpotTicker(self , u_id , symbol_pair = "EOS_USDT" , time_period = 86400):116 symbol_pair = systemSymbolToVnSymbol(symbol_pair)117 self.sendSocketCmd( u_id , "ticker.query" , [ symbol_pair , time_period])118 '''119 vngate_nwe.onMessage:{120 "error": null,121 "result": {122 "asks": [123 [124 "16.9507",125 "293.6242489299"126 ],127 [128 "16.9586",129 "1591.5376104592"130 ],131 ],132 "bids": [133 [134 "16.95",135 "0.1094"136 ],137 [138 "16.9471",139 "163.602"140 ],141 [142 "16.9431",143 "1.4607"144 ],145 ]146 },147 "id": 1148}149 '''150 #----------------------------------------------------------------------151 def querySpotDepth(self , u_id , symbol_pair = "EOS_USDT" , limit = 5 , interval = "0.00000001"):152 symbol_pair = systemSymbolToVnSymbol(symbol_pair)153 self.sendSocketCmd( u_id , "depth.query" , [ symbol_pair , limit , interval])154 '''155 {156 "error": null,157 "result": [158 {159 "id": 7177814,160 "time": 1523887673.562782,161 "price": "6.05",162 "amount": "20",163 "type": "buy"164 },165 {166 "id": 7177813,167 "time": 1523887354.256974,168 "price": "6.05",169 "amount": "15",170 "type": "buy"171 },172 ],173 "id": 12309174}175 '''176 #----------------------------------------------------------------------177 def querySpotTrades(self, u_id , symbol_pair = "EOS_USDT" , limit = 2 , last_id = 7177813):178 symbol_pair = systemSymbolToVnSymbol(symbol_pair)179 self.sendSocketCmd( u_id , "trades.query" , [ symbol_pair , limit , last_id])180 '''181 {182 "error": null, 183 "result": [184 [185 1492358400, time186 "7000.00", open187 "8000.0", close188 "8100.00", highest189 "6800.00", lowest190 "1000.00" volume191 "123456.00" amount192 "BTC_USDT" market name193 ]194 ...195 ]196 "id": 12312197 }198 '''199 #----------------------------------------------------------------------200 def querySpotKline(self, u_id , symbol_pair = "BTC_USDT", start = 1516951219 , end_time = 1516951219 , interval = 1800):201 symbol_pair = systemSymbolToVnSymbol(symbol_pair)202 self.sendSocketCmd( u_id , "kline.query" , [ symbol_pair , start , end_time, interval])203 '''204 vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 2}205 vngate_nwe.onMessage:{"method": "ticker.update", "params": ["BOT_USDT", {"period": 86400, "open": "0206 .7383", "close": "0.9048", "high": "1.015", "low": "0.715", "last": "0.9048", "change": "22.55", "qu207 oteVolume": "4565863.1552367147", "baseVolume": "4071168.7349472209511"}], "id": null}208 vngate_nwe.onMessage:{"method": "ticker.update", "params": ["BOT_USDT", {"period": 86400, "open": "0209 .7383", "close": "0.9049", "high": "1.015", "low": "0.715", "last": "0.9049", "change": "22.56", "qu210 oteVolume": "4571805.6819467147", "baseVolume": "4076546.0501166889511"}], "id": null}211 '''212 #----------------------------------------------------------------------213 def subscribeSpotTicker(self, u_id , symbol_pair = "BOT_USDT"):214 symbol_pair = systemSymbolToVnSymbol(symbol_pair)215 self.sendSocketCmd( u_id , "ticker.subscribe" , [ symbol_pair ])216 '''217 vngate_nwe.onMessage:{"method": "depth.update", "params": [true, {"asks": [["0.893", "813.385"], ["0218.8931", "102.65936009"], ["0.8932", "288.8898"], ["0.9058", "2028"], ["0.9067", "10"], ["0.9076", "421987.11"], ["0.9084", "1000"], ["0.9085", "17.49966971"], ["0.9086", "49.468551235"], ["0.9087", "1950220.59"]], "bids": [["0.8929", "88.76"], ["0.8921", "198.01888"], ["0.892", "256.09"], ["0.8919", "3280221.5348"], ["0.8803", "1382"], ["0.8802", "2257.925"], ["0.8801", "16.58862017"], ["0.88", "300"], ["0222.8779", "822.56"], ["0.8669", "774.0223"]]}, "BOT_USDT"], "id": null}223vngate_nwe.onMessage:{"method": "depth.update", "params": [false, {"bids": [["0.8929", "110.0925"]]}224, "BOT_USDT"], "id": null}225 Can only subscribe one market at the same time, market list is not supported currently. For multiple subscriptions, only the last one takes effect.226 '''227 #----------------------------------------------------------------------228 def subscribeSpotDepth(self, u_id , symbol_pair = "BOT_USDT" , limit = 30, interval = "0.00000001"):229 symbol_pair = systemSymbolToVnSymbol(symbol_pair)230 self.sendSocketCmd( u_id , "depth.subscribe" , [ symbol_pair , limit , interval])231 '''232 vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 3}233vngate_nwe.onMessage:{"method": "trades.update", "params": ["BOT_USDT", [{"id": 56675623, "time": 1523425592829.2169299, "price": "0.9096", "amount": "310.3478", "type": "sell"}, {"id": 56675622, "time":235 1525592829.2167261, "price": "0.9096", "amount": "461", "type": "sell"}, {"id": 56667395236, "time": 1525591676.7347641, "price": "0.9085", "amount": "847.41", "type": "sell"}]], "id": null}237 '''238 #----------------------------------------------------------------------239 def subscribeSpotTrades(self, u_id , symbol_pair = "BOT_USDT" ):240 symbol_pair = systemSymbolToVnSymbol(symbol_pair)241 self.sendSocketCmd( u_id , "trades.subscribe" , [ symbol_pair ])242 '''243 vngate_nwe.onMessage:{"error": null, "result": {"status": "success"}, "id": 3}244vngate_nwe.onMessage:{"method": "kline.update", "params": [[1525591800, "0.9085", "0.9174", "0.9217"245, "0.9049", "78364.430712655", "71495.1179278982815", "BOT_USDT"]], "id": null}246 '''247 #----------------------------------------------------------------------248 def subscribeSpotKline(self, u_id , symbol_pair = "BOT_USDT" , interval = 1800):249 symbol_pair = systemSymbolToVnSymbol(symbol_pair)...

Full Screen

Full Screen

phemex.py

Source:phemex.py Github

copy

Full Screen

...33 "symbol"34 ][0]35 data = await resp.json()36 if resp.url.path in ("/exchange/public/md/kline",):37 self.kline._onmessage({"symbol": symbol, "kline": data["data"]["rows"]})38 def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:39 if not msg.get("id"):40 if "trades" in msg:41 self.trade._onmessage(msg)42 elif "book" in msg:43 self.orderbook._onmessage(msg)44 elif "tick" in msg:45 self.ticker._onmessage(msg)46 elif "market24h" in msg:47 self.market24h._onmessage(msg["market24h"])48 elif "kline" in msg:49 self.kline._onmessage(msg)50 if "accounts" in msg:51 self.accounts._onmessage(msg.get("accounts"))52 if "orders" in msg:53 self.orders._onmessage(msg.get("orders"))54 if "positions" in msg:55 self.positions._onmessage(msg.get("positions"))56 if msg.get("error"):57 logger.warning(msg)58 @property59 def trade(self) -> "Trade":60 return self.get("trade", Trade)61 @property62 def orderbook(self) -> "OrderBook":63 return self.get("orderbook", OrderBook)64 @property65 def ticker(self):66 return self.get("ticker", Ticker)67 @property68 def market24h(self) -> "Market24h":69 return self.get("market24h", Market24h)70 @property71 def kline(self) -> "Kline":72 return self.get("kline", Kline)73 @property74 def accounts(self) -> "Accounts":75 return self.get("accounts", Accounts)76 @property77 def orders(self) -> "Orders":78 return self.get("orders", Orders)79 @property80 def positions(self) -> "Positions":81 return self.get("positions", Positions)82class Trade(DataStore):83 _KEYS = ["symbol", "timestamp"]84 _MAXLEN = 9999985 def _onmessage(self, message: Item) -> None:86 symbol = message.get("symbol")87 self._insert(88 [89 {90 "symbol": symbol,91 "timestamp": item[0],92 "side": item[1],93 "price": item[2] / 10000,94 "size": item[3],95 }96 for item in message.get("trades", [])97 ]98 )99class OrderBook(DataStore):100 _KEYS = ["symbol", "side", "price"]101 def _init(self) -> None:102 self.timestamp: Optional[int] = None103 def sorted(self, query: Item = None) -> dict[str, list[Item]]:104 if query is None:105 query = {}106 result = {"SELL": [], "BUY": []}107 for item in self:108 if all(k in item and query[k] == item[k] for k in query):109 result[item["side"]].append(item)110 result["SELL"].sort(key=lambda x: x["price"])111 result["BUY"].sort(key=lambda x: x["price"], reverse=True)112 return result113 def _onmessage(self, message: Item) -> None:114 symbol = message["symbol"]115 book = message["book"]116 for key, side in (("bids", "BUY"), ("asks", "SELL")):117 for item in book[key]:118 if item[1] != 0:119 self._insert(120 [121 {122 "symbol": symbol,123 "side": side,124 "price": item[0],125 "size": item[1],126 }127 ]128 )129 else:130 self._delete(131 [132 {133 "symbol": symbol,134 "side": side,135 "price": item[0],136 "size": item[1],137 }138 ]139 )140 self.timestamp = message["timestamp"]141class Ticker(DataStore):142 _KEYS = ["symbol"]143 def _onmessage(self, message):144 self._update([message.get("tick")])145class Market24h(DataStore):146 _KEYS = ["symbol"]147 def _onmessage(self, item: Item) -> None:148 self._update([item])149class Kline(DataStore):150 _KEYS = ["symbol", "interval", "timestamp"]151 def _onmessage(self, message: Item) -> None:152 symbol = message.get("symbol")153 self._insert(154 [155 {156 "symbol": symbol,157 "interval": item[1],158 "timestamp": item[0],159 "open": item[3] / 10000,160 "high": item[4] / 10000,161 "low": item[5] / 10000,162 "close": item[6] / 10000,163 "volume": item[7],164 "turnover": item[8] / 10000,165 }166 for item in message.get("kline", [])167 ]168 )169class Accounts(DataStore):170 _KEYS = ["accountID", "currency"]171 def _onmessage(self, data: list[Item]) -> None:172 self._update(data)173class Orders(DataStore):174 _KEYS = ["orderID"]175 def _onmessage(self, data: list[Item]) -> None:176 for item in data:177 if item["ordStatus"] == "New":178 self._insert([item])179 elif item["ordStatus"] == "PartiallyFilled":180 self._update([item])181 elif item["ordStatus"] == "Filled":182 self._delete([item])183 elif item["ordStatus"] == "Canceled" and item["action"] != "Replace":184 self._delete([item])185class Positions(DataStore):186 _KEYS = ["accountID", "symbol"]187 def _onmessage(self, data: list[Item]) -> None:188 for item in data:189 if item["size"] == 0:190 self._delete([item])191 else:...

Full Screen

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(e) {2 console.log('Message received from main script');3 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4 console.log('Posting message back to main script');5 postMessage(workerResult);6}7 if (window.Worker) {8 var myWorker = new Worker("test.js");9 myWorker.onmessage = function(e) {10 document.getElementById("result").innerHTML = e.data;11 };12 myWorker.postMessage([1, 2]);13 } else {14 document.getElementById("result").innerHTML = "Sorry, your browser does not support Web Workers...";15 }

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(e) {2 console.log('Message received from main script');3 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4 console.log('Posting message back to main script');5 postMessage(workerResult);6}7var wpt = new Worker("test.js");8wpt.onmessage = function(event) {9 document.getElementById("result").innerHTML = event.data;10};11wpt.postMessage([10, 20]);12onmessage = function(e) {13 console.log('Message received from main script');14 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);15 console.log('Posting message back to main script');16 postMessage(workerResult);17}18var wpt = new Worker("test.js");19wpt.onmessage = function(event) {20 document.getElementById("result").innerHTML = event.data;21};22wpt.postMessage([10, 20]);23onmessage = function(e) {24 console.log('Message received from main script');25 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);26 console.log('Posting message back to main script');27 postMessage(workerResult);28}29var wpt = new Worker("test.js");30wpt.onmessage = function(event) {31 document.getElementById("result").innerHTML = event.data;32};33wpt.postMessage([10, 20]);34onmessage = function(e) {35 console.log('Message received from main script');36 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);37 console.log('Posting message back to main script');38 postMessage(workerResult);39}

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(e) {2 console.log('Worker: Message received from main script');3 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);4 console.log('Worker: Posting message back to main script');5 postMessage(workerResult);6}7self.addEventListener('message', function(e) {8 console.log('Worker: Message received from main script');9 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);10 console.log('Worker: Posting message back to main script');11 postMessage(workerResult);12}, false);13self.addEventListener('message', function(e) {14 console.log('Worker: Message received from main script');15 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);16 console.log('Worker: Posting message back to main script');17 postMessage(workerResult);18}, false);19self.addEventListener('message', function(e) {20 console.log('Worker: Message received from main script');21 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);22 console.log('Worker: Posting message back to main script');23 postMessage(workerResult);24}, false);25self.addEventListener('message', function(e) {26 console.log('Worker: Message received from main script');27 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);28 console.log('Worker: Posting message back to main script');29 postMessage(workerResult);30}, false);31self.addEventListener('message', function(e) {32 console.log('Worker: Message received from main script');33 var workerResult = 'Result: ' + (e.data[0] * e.data[1]);34 console.log('Worker: Posting message back to main script');35 postMessage(workerResult);36}, false);37self.addEventListener('message', function(e) {

Full Screen

Using AI Code Generation

copy

Full Screen

1var wpt = new Worker('worker.js');2wpt.onmessage = function (e) {3 console.log(e.data);4};5wpt.postMessage('Hello from test.js');6onmessage = function (e) {7 console.log(e.data);8 postMessage('Hello from worker.js');9};10var wpt = new Worker('worker.js');11wpt.addEventListener('message', function (e) {12 console.log(e.data);13});14wpt.postMessage('Hello from test.js');15self.addEventListener('message', function (e) {16 console.log(e.data);17 postMessage('Hello from worker.js');18});19var wpt = new Worker('worker.js');20wpt.postMessage('Hello from test.js');21self.addEventListener('message', function (e) {22 console.log(e.data);23 postMessage('Hello from worker.js');24});25var wpt = new Worker('worker.js');26wpt.addEventListener('message', function (e) {27 console.log(e.data);28});29wpt.postMessage('Hello from test.js');30self.addEventListener('message', function (e) {31 console.log(e.data);32 postMessage('Hello from worker.js');33});34var wpt = new Worker('worker.js');35wpt.postMessage('Hello from test.js');36self.addEventListener('message', function (e) {37 console.log(e.data);38 postMessage('Hello from worker.js');39});40var wpt = new Worker('worker.js');41wpt.addEventListener('message', function (e) {42 console.log(e.data);43});44wpt.postMessage('Hello from test.js');

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(event) {2 var data = event.data;3 var result = data;4 postMessage(result);5}6var worker = new Worker('test.js');7worker.postMessage('test');8worker.onmessage = function(event) {9 console.log(event.data);10}11onmessage = function(event) {12 var data = event.data;13 var result = data;14 for (var i = 0; i < 100; i++) {15 for (var j = 0; j < 100; j++) {16 for (var k = 0; k < 100; k++) {17 var result = i + j + k;18 }19 }20 }21 postMessage(result);22}23var worker = new Worker('test.js');24worker.postMessage('test');25worker.onmessage = function(event) {26 console.log(event.data);27}

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(e) {2 postMessage("done");3}4function doWork() {5 var worker = new Worker("test.js");6 worker.onmessage = function(e) {7 }8 worker.postMessage("start");9}10onmessage = function(e) {11 postMessage("done");12}13function doWork() {14 var worker = new Worker("test.js");15 worker.onmessage = function(e) {16 }17 worker.postMessage("start");18}19onmessage = function(e) {20 postMessage("done");21}22function doWork() {23 var worker = new Worker("test.js");24 worker.onmessage = function(e) {25 }26 worker.postMessage("start");27}28onmessage = function(e) {29 postMessage("done");30}31function doWork() {32 var worker = new Worker("test.js");33 worker.onmessage = function(e) {34 }35 worker.postMessage("start");36}37onmessage = function(e) {38 postMessage("done");39}40function doWork() {41 var worker = new Worker("test.js");42 worker.onmessage = function(e) {43 }44 worker.postMessage("start");45}46onmessage = function(e) {47 postMessage("done");

Full Screen

Using AI Code Generation

copy

Full Screen

1onmessage = function(e){2 postMessage(e.data);3}4var worker = new Worker("test.js");5worker.postMessage("Hi from main thread");6worker.onmessage = function(e){7 console.log(e.data);8}9onmessage = function(e){10 postMessage(e.data);11}12var worker = new Worker("test.js");13worker.postMessage("Hi from main thread");14worker.onmessage = function(e){15 console.log(e.data);16}17onmessage = function(e){18 postMessage(e.data);19}20var worker = new Worker("test.js");21worker.postMessage("Hi from main thread");22worker.onmessage = function(e){23 console.log(e.data);24}25onmessage = function(e){26 postMessage(e.data);27}28var worker = new Worker("test.js");29worker.postMessage("Hi from main thread");30worker.onmessage = function(e){31 console.log(e.data);32}33onmessage = function(e){34 postMessage(e.data);35}36var worker = new Worker("test.js");37worker.postMessage("Hi from main thread");38worker.onmessage = function(e){39 console.log(e.data);40}

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run wpt automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful