How to use onMessage method in Playwright Python

Best Python code snippet using playwright-python

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

store.py

Source: store.py Github

copy
1import json
2import urllib.parse
3from threading import Event
4from typing import Any, Dict, List, Optional, Union
5from requests import Response, Session
6from websocket import WebSocket
7
8class DataStore:
9    def __init__(self) -> None:
10        self.orderbook = OrderBook()
11        self.trade = Trade()
12        self.insurance = Insurance()
13        self.instrument = Instrument()
14        self.kline = Kline()
15        self.position = Position()
16        self.execution = Execution()
17        self.order = Order()
18        self.stoporder = StopOrder()
19        self.wallet = Wallet()
20        self._events: List[Event] = []
21
22    def onresponse(self, resp: Response, session: Session) -> None:
23        content: Dict[str, Any] = resp.json()
24        if content.get('ret_code') == 0:
25            # order
26            if any([
27                resp.request.path_url.startswith('/v2/private/order'),
28                resp.request.path_url.startswith('/private/linear/order/search'),
29                resp.request.path_url.startswith('/futures/private/order'),
30            ]):
31                if isinstance(content['result'], list):
32                    self.order._onresponse(content['result'])
33            # stoporder
34            elif any([
35                resp.request.path_url.startswith('/v2/private/stop-order'),
36                resp.request.path_url.startswith('/private/linear/order/search'),
37                resp.request.path_url.startswith('/futures/private/order'),
38            ]):
39                if isinstance(content['result'], list):
40                    self.stoporder._onresponse(content['result'])
41            # position
42            elif any([
43                resp.request.path_url.startswith('/v2/private/position/list'),
44                resp.request.path_url.startswith('/futures/private/position/list'),
45            ]):
46                self.position.inverse._onresponse(content['result'])
47            elif resp.request.path_url.startswith('/private/linear/position/list'):
48                self.position.linear._onresponse(content['result'])
49            # wallet
50            elif resp.request.path_url.startswith('/v2/private/wallet/balance'):
51                self.wallet._onresponse(content['result'])
52
53    def onmessage(self, msg: str, ws: WebSocket) -> None:
54        content: Dict[str, Any] = json.loads(msg)
55        if 'topic' in content:
56            topic: str = content['topic']
57            data: Union[List[Item], Item] = content['data']
58            type_: Optional[str] = content.get('type')
59            if any([
60                topic.startswith('orderBookL2_25'),
61                topic.startswith('orderBook_200'),
62            ]):
63                self.orderbook._onmessage(type_, data)
64            elif topic.startswith('trade'):
65                self.trade._onmessage(data)
66            elif topic.startswith('insurance'):
67                self.insurance._onmessage(data)
68            elif topic.startswith('instrument_info'):
69                self.instrument._onmessage(type_, data)
70            if any([
71                topic.startswith('klineV2'),
72                topic.startswith('candle'),
73            ]):
74                self.kline._onmessage(topic, data)
75            elif topic == 'position':
76                self.position._onmessage(data)
77                self.wallet._onposition(data)
78            elif topic == 'execution':
79                self.execution._onmessage(data)
80            elif topic == 'order':
81                self.order._onmessage(data)
82            elif topic == 'stop_order':
83                self.stoporder._onmessage(data)
84            elif topic == 'wallet':
85                self.wallet._onmessage(data)
86            for event in self._events:
87                event.set()
88            self._events.clear()
89
90    def wait(self) -> None:
91        event = Event()
92        self._events.append(event)
93        event.wait()
94
95class DefaultDataStore(DataStore): ...
96
97Item = Dict[str, Any]
98
99class _KeyValueStore:
100    _KEYS: List[str]
101    _MAXLEN: Optional[int]
102
103    def __init__(self) -> None:
104        self._data: Dict[str, Item] = {}
105        self._events: List[Event] = []
106    
107    def get(self, **kwargs) -> Optional[Item]:
108        try:
109            dumps = self._dumps(kwargs)
110            if dumps in self._data:
111                return self._data[dumps]
112        except KeyError:
113            if kwargs:
114                for item in self._data.values():
115                    for k, v, in kwargs.items():
116                        if not k in item:
117                            break
118                        if v != item[k]:
119                            break
120                    else:
121                        return item
122            else:
123                for item in self._data.values():
124                    return item
125
126    def getlist(self, **kwargs) -> List[Item]:
127        if kwargs:
128            result = []
129            for item in self._data.values():
130                for k, v in kwargs.items():
131                    if not k in item:
132                        break
133                    if v != item[k]:
134                        break
135                else:
136                    result.append(item)
137            return result
138        else:
139            return list(self._data.values())
140
141    def __len__(self):
142        return len(self._data)
143
144    def _dumps(self, item: Item) -> str:
145        keyitem = {k: item[k] for k in self._KEYS}
146        return urllib.parse.urlencode(keyitem)
147    
148    def _update(self, items: List[Item]) -> None:
149        for item in items:
150            try:
151                key = self._dumps(item)
152                if key in self._data:
153                    self._data[key].update(item)
154                else:
155                    self._data[key] = item
156            except KeyError:
157                pass
158        if self._MAXLEN is not None:
159            len_data = len(self._data)
160            if len_data > self._MAXLEN:
161                over = len_data - self._MAXLEN
162                keys = []
163                for i, k in enumerate(self._data.keys()):
164                    if i < over:
165                        keys.append(k)
166                    else:
167                        break
168                for k in keys:
169                    self._data.pop(k)
170        for event in self._events:
171            event.set()
172        self._events.clear()
173
174    def _pop(self, items: List[Item]) -> None:
175        for item in items:
176            try:
177                key = self._dumps(item)
178                if key in self._data:
179                    self._data.pop(key)
180            except KeyError:
181                pass
182        for event in self._events:
183            event.set()
184        self._events.clear()
185
186    def wait(self) -> None:
187        event = Event()
188        self._events.append(event)
189        event.wait()
190
191class OrderBook(_KeyValueStore):
192    _KEYS = ['symbol', 'id', 'side']
193    _MAXLEN = None
194
195    def getbest(self, symbol: str) -> Dict[str, Optional[Item]]:
196        result = {'Sell': {}, 'Buy': {}}
197        for item in self._data.values():
198            if item['symbol'] == symbol:
199                result[item['side']][float(item['price'])] = item
200        return {
201            'Sell': result['Sell'][min(result['Sell'])] if result['Sell'] else None,
202            'Buy': result['Buy'][max(result['Buy'])] if result['Buy'] else None
203        }
204
205    def getsorted(self, symbol: str) -> Dict[str, List[Item]]:
206        result = {'Sell': [], 'Buy': []}
207        for item in self._data.values():
208            if item['symbol'] == symbol:
209                result[item['side']].append(item)
210        return {
211            'Sell': sorted(result['Sell'], key=lambda x: float(x['price'])),
212            'Buy': sorted(result['Buy'], key=lambda x: float(x['price']), reverse=True)
213        }
214
215    def _onmessage(self, type_: str, data: Union[List[Item], Item]) -> None:
216        if type_ == 'snapshot':
217            if isinstance(data, dict):
218                data = data['order_book']
219            self._update(data)
220        elif type_ == 'delta':
221            self._pop(data['delete'])
222            self._update(data['update'])
223            self._update(data['insert'])
224
225class Trade(_KeyValueStore):
226    _KEYS = ['trade_id']
227    _MAXLEN = 10000
228
229    def _onmessage(self, data: List[Item]) -> None:
230        self._update(data)
231
232class Insurance(_KeyValueStore):
233    _KEYS = ['currency']
234    _MAXLEN = None
235
236    def _onmessage(self, data: List[Item]) -> None:
237        self._update(data)
238
239class Instrument(_KeyValueStore):
240    _KEYS = ['symbol']
241    _MAXLEN = None
242
243    def _onmessage(self, type_: str, data: Item) -> None:
244        if type_ == 'snapshot':
245            self._update([data])
246        elif type_ == 'delta':
247            self._update(data['update'])
248
249class Kline(_KeyValueStore):
250    _KEYS = ['symbol', 'start']
251    _MAXLEN = 5000
252
253    def _onmessage(self, topic: str, data: List[Item]) -> None:
254        symbol = topic.split('.')[2] # ex:'klineV2.1.BTCUSD'
255        for item in data:
256            item['symbol'] = symbol
257        self._update(data)
258
259class Position:
260    def __init__(self):
261        self.inverse = PositionInverse()
262        self.linear = PositionLinear()
263    
264    def _onmessage(self, data: List[Item]) -> None:
265        if len(data):
266            symbol: str = data[0]['symbol']
267            if symbol.endswith('USDT'):
268                self.linear._onmessage(data)
269            else:
270                self.inverse._onmessage(data)
271
272class PositionInverse(_KeyValueStore):
273    _KEYS = ['symbol', 'position_idx']
274    _MAXLEN = None
275    
276    def getone(self, symbol: str) -> Optional[Item]:
277        return self.get(symbol=symbol, position_idx=0)
278
279    def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:
280        return {
281            'Sell': self.get(symbol=symbol, position_idx=2),
282            'Buy': self.get(symbol=symbol, position_idx=1),
283        }
284
285    def _onresponse(self, data: Union[Item, List[Item]]) -> None:
286        if isinstance(data, dict):
287            self._update([data])
288        elif isinstance(data, list):
289            if len(data) and 'data' in data[0]:
290                self._update([item['data'] for item in data])
291            else:
292                self._update(data)
293
294    def _onmessage(self, data: List[Item]) -> None:
295        self._update(data)
296
297class PositionLinear(_KeyValueStore):
298    _KEYS = ['symbol', 'side']
299    _MAXLEN = None
300
301    def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:
302        return {
303            'Sell': self.get(symbol=symbol, side='Sell'),
304            'Buy': self.get(symbol=symbol, side='Buy'),
305        }
306
307    def _onresponse(self, data: List[Item]) -> None:
308        if len(data) and 'data' in data[0]:
309            self._update([item['data'] for item in data])
310        else:
311            self._update(data)
312
313    def _onmessage(self, data: List[Item]) -> None:
314        self._update(data)
315
316class Execution(_KeyValueStore):
317    _KEYS = ['exec_id']
318    _MAXLEN = 5000
319
320    def _onmessage(self, data: List[Item]) -> None:
321        self._update(data)
322
323class Order(_KeyValueStore):
324    _KEYS = ['order_id']
325    _MAXLEN = None
326
327    def _onresponse(self, data: List[Item]) -> None:
328        self._update(data)
329
330    def _onmessage(self, data: List[Item]) -> None:
331        for item in data:
332            if item['order_status'] in ('Created', 'New', 'PartiallyFilled', ):
333                self._update([item])
334            else:
335                self._pop([item])
336
337class StopOrder(_KeyValueStore):
338    _KEYS = ['stop_order_id']
339    _MAXLEN = None
340
341    def _onresponse(self, data: List[Item]) -> None:
342        self._update(data)
343
344    def _onmessage(self, data: List[Item]) -> None:
345        for item in data:
346            if 'order_id' in item:
347                item['stop_order_id'] = item.pop('order_id')
348            if 'order_status' in item:
349                item['stop_order_status'] = item.pop('order_status')
350            if item['stop_order_status'] in ('Active', 'Untriggered', ):
351                self._update([item])
352            else:
353                self._pop([item])
354
355class Wallet(_KeyValueStore):
356    _KEYS = ['coin']
357    _MAXLEN = None
358
359    def _onresponse(self, data: Dict[str, Item]) -> None:
360        for coin, item in data.items():
361            _item = {}
362            _item['coin'] = coin
363            _item['wallet_balance'] = item['wallet_balance']
364            _item['available_balance'] = item['available_balance']
365            self._update([_item])
366
367    def _onposition(self, data: List[Item]) -> None:
368        if len(data) and 'position_idx' in data[0]:
369            for item in data:
370                _item = {}
371                symbol: str = item['symbol']
372                if symbol.endswith('USD'):
373                    _item['coin'] = symbol[:-3] # ex:'BTCUSD'
374                else:
375                    _item['coin'] = symbol[:-6] # ex:'BTCUSDM21'
376                _item['wallet_balance'] = item['wallet_balance']
377                _item['available_balance'] = item['available_balance']
378                self._update([_item])
379
380    def _onmessage(self, data: List[Item]) -> None:
381        for item in data:
382            _item = {}
383            _item['coin'] = 'USDT'
384            _item['wallet_balance'] = item['wallet_balance']
385            _item['available_balance'] = item['available_balance']
386            self._update([item])
387
Full Screen

test_claims.py

Source: test_claims.py Github

copy
1# Copyright (c) 2015 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License.  You may obtain a copy
5# of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15import json
16
17import ddt
18import mock
19from oslo_utils import timeutils
20from oslo_utils import uuidutils
21
22from zaqar.common import consts
23from zaqar.tests.unit.transport.websocket import base
24from zaqar.tests.unit.transport.websocket import utils as test_utils
25
26
27@ddt.ddt
28class ClaimsBaseTest(base.V1_1Base):
29
30    config_file = "websocket_mongodb.conf"
31
32    def setUp(self):
33        super(ClaimsBaseTest, self).setUp()
34        self.protocol = self.transport.factory()
35        self.defaults = self.api.get_defaults()
36
37        self.project_id = '7e55e1a7e'
38        self.headers = {
39            'Client-ID': uuidutils.generate_uuid(),
40            'X-Project-ID': self.project_id
41        }
42
43        action = consts.QUEUE_CREATE
44        body = {"queue_name": "skittle"}
45        req = test_utils.create_request(action, body, self.headers)
46
47        with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:
48            self.protocol.onMessage(req, False)
49            resp = json.loads(msg_mock.call_args[0][0].decode())
50            self.assertIn(resp['headers']['status'], [201, 204])
51
52        action = consts.MESSAGE_POST
53        body = {"queue_name": "skittle",
54                "messages": [
55                    {'body': 239, 'ttl': 300},
56                    {'body': {'key_1': 'value_1'}, 'ttl': 300},
57                    {'body': [1, 3], 'ttl': 300},
58                    {'body': 439, 'ttl': 300},
59                    {'body': {'key_2': 'value_2'}, 'ttl': 300},
60                    {'body': ['a', 'b'], 'ttl': 300},
61                    {'body': 639, 'ttl': 300},
62                    {'body': {'key_3': 'value_3'}, 'ttl': 300},
63                    {'body': ["aa", "bb"], 'ttl': 300}]
64                }
65
66        send_mock = mock.Mock()
67        self.protocol.sendMessage = send_mock
68
69        req = test_utils.create_request(action, body, self.headers)
70
71        self.protocol.onMessage(req, False)
72
73        resp = json.loads(send_mock.call_args[0][0].decode())
74        self.assertEqual(201, resp['headers']['status'])
75
76    def tearDown(self):
77        super(ClaimsBaseTest, self).tearDown()
78        action = consts.QUEUE_DELETE
79        body = {'queue_name': 'skittle'}
80
81        send_mock = mock.Mock()
82        self.protocol.sendMessage = send_mock
83
84        req = test_utils.create_request(action, body, self.headers)
85        self.protocol.onMessage(req, False)
86
87        resp = json.loads(send_mock.call_args[0][0].decode())
88        self.assertEqual(204, resp['headers']['status'])
89
90    @ddt.data('[', '[]', '.', '"fail"')
91    def test_bad_claim(self, doc):
92        action = consts.CLAIM_CREATE
93        body = doc
94
95        send_mock = mock.Mock()
96        self.protocol.sendMessage = send_mock
97
98        req = test_utils.create_request(action, body, self.headers)
99        self.protocol.onMessage(req, False)
100        resp = json.loads(send_mock.call_args[0][0].decode())
101        self.assertEqual(400, resp['headers']['status'])
102
103        action = consts.CLAIM_UPDATE
104        body = doc
105
106        req = test_utils.create_request(action, body, self.headers)
107        self.protocol.onMessage(req, False)
108        resp = json.loads(send_mock.call_args[0][0].decode())
109        self.assertEqual(400, resp['headers']['status'])
110
111    def test_exceeded_claim(self):
112        action = consts.CLAIM_CREATE
113        body = {"queue_name": "skittle",
114                "ttl": 100,
115                "grace": 60,
116                "limit": 21}
117
118        send_mock = mock.Mock()
119        self.protocol.sendMessage = send_mock
120
121        req = test_utils.create_request(action, body, self.headers)
122        self.protocol.onMessage(req, False)
123        resp = json.loads(send_mock.call_args[0][0].decode())
124        self.assertEqual(400, resp['headers']['status'])
125
126    @ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))
127    def test_unacceptable_ttl_or_grace(self, ttl_grace):
128        ttl, grace = ttl_grace
129        action = consts.CLAIM_CREATE
130        body = {"queue_name": "skittle",
131                "ttl": ttl,
132                "grace": grace}
133
134        send_mock = mock.Mock()
135        self.protocol.sendMessage = send_mock
136
137        req = test_utils.create_request(action, body, self.headers)
138        self.protocol.onMessage(req, False)
139        resp = json.loads(send_mock.call_args[0][0].decode())
140        self.assertEqual(400, resp['headers']['status'])
141
142    @ddt.data(-1, 59, 43201)
143    def test_unacceptable_new_ttl(self, ttl):
144        claim = self._get_a_claim()
145
146        action = consts.CLAIM_UPDATE
147        body = {"queue_name": "skittle",
148                "claim_id": claim['body']['claim_id'],
149                "ttl": ttl}
150
151        send_mock = mock.Mock()
152        self.protocol.sendMessage = send_mock
153
154        req = test_utils.create_request(action, body, self.headers)
155        self.protocol.onMessage(req, False)
156        resp = json.loads(send_mock.call_args[0][0].decode())
157        self.assertEqual(400, resp['headers']['status'])
158
159    def test_default_ttl_and_grace(self):
160        action = consts.CLAIM_CREATE
161        body = {"queue_name": "skittle"}
162
163        send_mock = mock.Mock()
164        self.protocol.sendMessage = send_mock
165
166        req = test_utils.create_request(action, body, self.headers)
167        self.protocol.onMessage(req, False)
168        resp = json.loads(send_mock.call_args[0][0].decode())
169        self.assertEqual(201, resp['headers']['status'])
170
171        action = consts.CLAIM_GET
172        body = {"queue_name": "skittle",
173                "claim_id": resp['body']['claim_id']}
174
175        req = test_utils.create_request(action, body, self.headers)
176        self.protocol.onMessage(req, False)
177        resp = json.loads(send_mock.call_args[0][0].decode())
178
179        self.assertEqual(200, resp['headers']['status'])
180        self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])
181
182    def test_lifecycle(self):
183        # First, claim some messages
184        action = consts.CLAIM_CREATE
185        body = {"queue_name": "skittle",
186                "ttl": 100,
187                "grace": 60}
188
189        send_mock = mock.Mock()
190        self.protocol.sendMessage = send_mock
191
192        req = test_utils.create_request(action, body, self.headers)
193        self.protocol.onMessage(req, False)
194        resp = json.loads(send_mock.call_args[0][0].decode())
195        self.assertEqual(201, resp['headers']['status'])
196        claimed_messages = resp['body']['messages']
197        claim_id = resp['body']['claim_id']
198
199        # No more messages to claim
200        body = {"queue_name": "skittle",
201                "ttl": 100,
202                "grace": 60}
203
204        req = test_utils.create_request(action, body, self.headers)
205        self.protocol.onMessage(req, False)
206        resp = json.loads(send_mock.call_args[0][0].decode())
207        self.assertEqual(204, resp['headers']['status'])
208
209        # Listing messages, by default, won't include claimed, will echo
210        action = consts.MESSAGE_LIST
211        body = {"queue_name": "skittle",
212                "echo": True}
213
214        req = test_utils.create_request(action, body, self.headers)
215        self.protocol.onMessage(req, False)
216        resp = json.loads(send_mock.call_args[0][0].decode())
217        self.assertEqual(200, resp['headers']['status'])
218        self.assertEqual([], resp['body']['messages'])
219
220        # Listing messages, by default, won't include claimed, won't echo
221
222        body = {"queue_name": "skittle",
223                "echo": False}
224
225        req = test_utils.create_request(action, body, self.headers)
226        self.protocol.onMessage(req, False)
227        resp = json.loads(send_mock.call_args[0][0].decode())
228        self.assertEqual(200, resp['headers']['status'])
229        self.assertEqual([], resp['body']['messages'])
230
231        # List messages, include_claimed, but don't echo
232
233        body = {"queue_name": "skittle",
234                "include_claimed": True,
235                "echo": False}
236
237        req = test_utils.create_request(action, body, self.headers)
238        self.protocol.onMessage(req, False)
239        resp = json.loads(send_mock.call_args[0][0].decode())
240        self.assertEqual(200, resp['headers']['status'])
241        self.assertEqual(resp['body']['messages'], [])
242
243        # List messages with a different client-id and echo=false.
244        # Should return some messages
245
246        body = {"queue_name": "skittle",
247                "echo": False}
248
249        headers = {
250            'Client-ID': uuidutils.generate_uuid(),
251            'X-Project-ID': self.project_id
252        }
253
254        req = test_utils.create_request(action, body, headers)
255        self.protocol.onMessage(req, False)
256        resp = json.loads(send_mock.call_args[0][0].decode())
257        self.assertEqual(200, resp['headers']['status'])
258
259        # Include claimed messages this time, and echo
260
261        body = {"queue_name": "skittle",
262                "include_claimed": True,
263                "echo": True}
264
265        req = test_utils.create_request(action, body, self.headers)
266        self.protocol.onMessage(req, False)
267        resp = json.loads(send_mock.call_args[0][0].decode())
268        self.assertEqual(200, resp['headers']['status'])
269        self.assertEqual(len(claimed_messages), len(resp['body']['messages']))
270
271        message_id_1 = resp['body']['messages'][0]['id']
272        message_id_2 = resp['body']['messages'][1]['id']
273
274        # Try to delete the message without submitting a claim_id
275        action = consts.MESSAGE_DELETE
276        body = {"queue_name": "skittle",
277                "message_id": message_id_1}
278
279        req = test_utils.create_request(action, body, self.headers)
280        self.protocol.onMessage(req, False)
281        resp = json.loads(send_mock.call_args[0][0].decode())
282        self.assertEqual(403,  resp['headers']['status'])
283
284        # Delete the message and its associated claim
285        body = {"queue_name": "skittle",
286                "message_id": message_id_1,
287                "claim_id": claim_id}
288
289        req = test_utils.create_request(action, body, self.headers)
290        self.protocol.onMessage(req, False)
291        resp = json.loads(send_mock.call_args[0][0].decode())
292        self.assertEqual(204, resp['headers']['status'])
293
294        # Try to get it from the wrong project
295        headers = {
296            'Client-ID': uuidutils.generate_uuid(),
297            'X-Project-ID': 'someproject'
298        }
299
300        action = consts.MESSAGE_GET
301        body = {"queue_name": "skittle",
302                "message_id": message_id_2}
303        req = test_utils.create_request(action, body, headers)
304        self.protocol.onMessage(req, False)
305        resp = json.loads(send_mock.call_args[0][0].decode())
306        self.assertEqual(404,  resp['headers']['status'])
307
308        # Get the message
309        action = consts.MESSAGE_GET
310        body = {"queue_name": "skittle",
311                "message_id": message_id_2}
312        req = test_utils.create_request(action, body, self.headers)
313        self.protocol.onMessage(req, False)
314        resp = json.loads(send_mock.call_args[0][0].decode())
315        self.assertEqual(200, resp['headers']['status'])
316
317        # Update the claim
318        creation = timeutils.utcnow()
319        action = consts.CLAIM_UPDATE
320        body = {"queue_name": "skittle",
321                "ttl": 60,
322                "grace": 60,
323                "claim_id": claim_id}
324        req = test_utils.create_request(action, body, self.headers)
325        self.protocol.onMessage(req, False)
326        resp = json.loads(send_mock.call_args[0][0].decode())
327        self.assertEqual(204, resp['headers']['status'])
328
329        # Get the claimed messages (again)
330        action = consts.CLAIM_GET
331        body = {"queue_name": "skittle",
332                "claim_id": claim_id}
333        req = test_utils.create_request(action, body, self.headers)
334        self.protocol.onMessage(req, False)
335        query = timeutils.utcnow()
336        resp = json.loads(send_mock.call_args[0][0].decode())
337        self.assertEqual(200, resp['headers']['status'])
338        self.assertEqual(60, resp['body']['ttl'])
339
340        message_id_3 = resp['body']['messages'][0]['id']
341
342        estimated_age = timeutils.delta_seconds(creation, query)
343        # The claim's age should be 0 at this moment. But in some unexpected
344        # case, such as slow test, the age maybe larger than 0. Just skip
345        # asserting if so.
346        if resp['body']['age'] == 0:
347            self.assertGreater(estimated_age, resp['body']['age'])
348
349        # Delete the claim
350        action = consts.CLAIM_DELETE
351        body = {"queue_name": "skittle",
352                "claim_id": claim_id}
353        req = test_utils.create_request(action, body, self.headers)
354        self.protocol.onMessage(req, False)
355        resp = json.loads(send_mock.call_args[0][0].decode())
356        self.assertEqual(204, resp['headers']['status'])
357
358        # Try to delete a message with an invalid claim ID
359        action = consts.MESSAGE_DELETE
360        body = {"queue_name": "skittle",
361                "message_id": message_id_3,
362                "claim_id": claim_id}
363
364        req = test_utils.create_request(action, body, self.headers)
365        self.protocol.onMessage(req, False)
366        resp = json.loads(send_mock.call_args[0][0].decode())
367        self.assertEqual(400, resp['headers']['status'])
368
369        # Make sure it wasn't deleted!
370        action = consts.MESSAGE_GET
371        body = {"queue_name": "skittle",
372                "message_id": message_id_2}
373        req = test_utils.create_request(action, body, self.headers)
374        self.protocol.onMessage(req, False)
375        resp = json.loads(send_mock.call_args[0][0].decode())
376        self.assertEqual(200, resp['headers']['status'])
377
378        # Try to get a claim that doesn't exist
379        action = consts.CLAIM_GET
380        body = {"queue_name": "skittle",
381                "claim_id": claim_id}
382        req = test_utils.create_request(action, body, self.headers)
383        self.protocol.onMessage(req, False)
384        resp = json.loads(send_mock.call_args[0][0].decode())
385        self.assertEqual(404,  resp['headers']['status'])
386
387        # Try to update a claim that doesn't exist
388        action = consts.CLAIM_UPDATE
389        body = {"queue_name": "skittle",
390                "ttl": 60,
391                "grace": 60,
392                "claim_id": claim_id}
393        req = test_utils.create_request(action, body, self.headers)
394        self.protocol.onMessage(req, False)
395        resp = json.loads(send_mock.call_args[0][0].decode())
396        self.assertEqual(404,  resp['headers']['status'])
397
398    def test_post_claim_nonexistent_queue(self):
399        action = consts.CLAIM_CREATE
400        body = {"queue_name": "nonexistent",
401                "ttl": 100,
402                "grace": 60}
403
404        send_mock = mock.Mock()
405        self.protocol.sendMessage = send_mock
406
407        req = test_utils.create_request(action, body, self.headers)
408        self.protocol.onMessage(req, False)
409        resp = json.loads(send_mock.call_args[0][0].decode())
410        self.assertEqual(204, resp['headers']['status'])
411
412    def test_get_claim_nonexistent_queue(self):
413        action = consts.CLAIM_GET
414        body = {"queue_name": "nonexistent",
415                "claim_id": "aaabbbba"}
416
417        send_mock = mock.Mock()
418        self.protocol.sendMessage = send_mock
419
420        req = test_utils.create_request(action, body, self.headers)
421        self.protocol.onMessage(req, False)
422        resp = json.loads(send_mock.call_args[0][0].decode())
423        self.assertEqual(404,  resp['headers']['status'])
424
425    def _get_a_claim(self):
426        action = consts.CLAIM_CREATE
427        body = {"queue_name": "skittle",
428                "ttl": 100,
429                "grace": 60}
430
431        send_mock = mock.Mock()
432        self.protocol.sendMessage = send_mock
433
434        req = test_utils.create_request(action, body, self.headers)
435        self.protocol.onMessage(req, False)
436        resp = json.loads(send_mock.call_args[0][0].decode())
437        self.assertEqual(201, resp['headers']['status'])
438
439        return resp
440
Full Screen

bybit.py

Source: bybit.py Github

copy
1from __future__ import annotations
2
3import asyncio
4import logging
5from typing import Awaitable, Optional, Union
6
7import aiohttp
8
9from ..store import DataStore, DataStoreManager
10from ..typedefs import Item
11from ..ws import ClientWebSocketResponse
12
13logger = logging.getLogger(__name__)
14
15
16class BybitInverseDataStore(DataStoreManager):
17    """
18    Bybit Inverse契約のデータストアマネージャー
19    """
20
21    def _init(self) -> None:
22        self.create("orderbook", datastore_class=OrderBookInverse)
23        self.create("trade", datastore_class=TradeInverse)
24        self.create("insurance", datastore_class=Insurance)
25        self.create("instrument", datastore_class=InstrumentInverse)
26        self.create("kline", datastore_class=KlineInverse)
27        self.create("liquidation", datastore_class=LiquidationInverse)
28        self.create("position", datastore_class=PositionInverse)
29        self.create("execution", datastore_class=ExecutionInverse)
30        self.create("order", datastore_class=OrderInverse)
31        self.create("stoporder", datastore_class=StopOrderInverse)
32        self.create("wallet", datastore_class=WalletInverse)
33        self.timestamp_e6: Optional[int] = None
34
35    async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:
36        """
37        対応エンドポイント
38
39        - GET /v2/private/order (DataStore: order)
40        - GET /futures/private/order (DataStore: order)
41        - GET /v2/private/stop-order (DataStore: stoporder)
42        - GET /futures/private/stop-order (DataStore: stoporder)
43        - GET /v2/private/position/list (DataStore: position)
44        - GET /futures/private/position/list (DataStore: position)
45        - GET /v2/private/wallet/balance (DataStore: wallet)
46        - GET /v2/public/kline/list (DataStore: kline)
47        """
48        for f in asyncio.as_completed(aws):
49            resp = await f
50            data = await resp.json()
51            if data["ret_code"] != 0:
52                raise ValueError(
53                    "Response error at DataStore initialization\n"
54                    f"URL: {resp.url}\n"
55                    f"Data: {data}"
56                )
57            if resp.url.path in (
58                "/v2/private/order",
59                "/futures/private/order",
60            ):
61                self.order._onresponse(data["result"])
62            elif resp.url.path in (
63                "/v2/private/stop-order",
64                "/futures/private/stop-order",
65            ):
66                self.stoporder._onresponse(data["result"])
67            elif resp.url.path in (
68                "/v2/private/position/list",
69                "/futures/private/position/list",
70            ):
71                self.position._onresponse(data["result"])
72            elif resp.url.path == "/v2/public/kline/list":
73                self.kline._onresponse(data["result"])
74            elif resp.url.path == "/v2/private/wallet/balance":
75                self.wallet._onresponse(data["result"])
76
77    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:
78        if "success" in msg:
79            if not msg["success"]:
80                logger.warning(msg)
81        if "topic" in msg:
82            topic: str = msg["topic"]
83            data = msg["data"]
84            if any(
85                [
86                    topic.startswith("orderBookL2_25"),
87                    topic.startswith("orderBook_200"),
88                ]
89            ):
90                self.orderbook._onmessage(topic, msg["type"], data)
91            elif topic.startswith("trade"):
92                self.trade._onmessage(data)
93            elif topic.startswith("insurance"):
94                self.insurance._onmessage(data)
95            elif topic.startswith("instrument_info"):
96                self.instrument._onmessage(topic, msg["type"], data)
97            if topic.startswith("klineV2"):
98                self.kline._onmessage(topic, data)
99            elif topic.startswith("liquidation"):
100                self.liquidation._onmessage(data)
101            elif topic == "position":
102                self.position._onmessage(data)
103            elif topic == "execution":
104                self.execution._onmessage(data)
105            elif topic == "order":
106                self.order._onmessage(data)
107            elif topic == "stop_order":
108                self.stoporder._onmessage(data)
109            elif topic == "wallet":
110                self.wallet._onmessage(data)
111        if "timestamp_e6" in msg:
112            self.timestamp_e6 = int(msg["timestamp_e6"])
113
114    @property
115    def orderbook(self) -> "OrderBookInverse":
116        return self.get("orderbook", OrderBookInverse)
117
118    @property
119    def trade(self) -> "TradeInverse":
120        return self.get("trade", TradeInverse)
121
122    @property
123    def insurance(self) -> "Insurance":
124        return self.get("insurance", Insurance)
125
126    @property
127    def instrument(self) -> "InstrumentInverse":
128        return self.get("instrument", InstrumentInverse)
129
130    @property
131    def kline(self) -> "KlineInverse":
132        return self.get("kline", KlineInverse)
133
134    @property
135    def liquidation(self) -> "LiquidationInverse":
136        return self.get("liquidation", LiquidationInverse)
137
138    @property
139    def position(self) -> "PositionInverse":
140        """
141        インバース契約(無期限/先物)用のポジション
142        """
143        return self.get("position", PositionInverse)
144
145    @property
146    def execution(self) -> "ExecutionInverse":
147        return self.get("execution", ExecutionInverse)
148
149    @property
150    def order(self) -> "OrderInverse":
151        """
152        アクティブオーダーのみ(約定・キャンセル済みは削除される)
153        """
154        return self.get("order", OrderInverse)
155
156    @property
157    def stoporder(self) -> "StopOrderInverse":
158        """
159        アクティブオーダーのみ(トリガー済みは削除される)
160        """
161        return self.get("stoporder", StopOrderInverse)
162
163    @property
164    def wallet(self) -> "WalletInverse":
165        return self.get("wallet", WalletInverse)
166
167
168class BybitUSDTDataStore(DataStoreManager):
169    """
170    Bybit USDT契約のデータストアマネージャー
171    """
172
173    def _init(self) -> None:
174        self.create("orderbook", datastore_class=OrderBookUSDT)
175        self.create("trade", datastore_class=TradeUSDT)
176        self.create("insurance", datastore_class=Insurance)
177        self.create("instrument", datastore_class=InstrumentUSDT)
178        self.create("kline", datastore_class=KlineUSDT)
179        self.create("liquidation", datastore_class=LiquidationUSDT)
180        self.create("position", datastore_class=PositionUSDT)
181        self.create("execution", datastore_class=ExecutionUSDT)
182        self.create("order", datastore_class=OrderUSDT)
183        self.create("stoporder", datastore_class=StopOrderUSDT)
184        self.create("wallet", datastore_class=WalletUSDT)
185        self.timestamp_e6: Optional[int] = None
186
187    async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:
188        """
189        対応エンドポイント
190
191        - GET /private/linear/order/search (DataStore: order)
192        - GET /private/linear/stop-order/search (DataStore: stoporder)
193        - GET /private/linear/position/list (DataStore: position)
194        - GET /private/linear/position/list (DataStore: position)
195        - GET /public/linear/kline (DataStore: kline)
196        - GET /v2/private/wallet/balance (DataStore: wallet)
197        """
198        for f in asyncio.as_completed(aws):
199            resp = await f
200            data = await resp.json()
201            if data["ret_code"] != 0:
202                raise ValueError(
203                    "Response error at DataStore initialization\n"
204                    f"URL: {resp.url}\n"
205                    f"Data: {data}"
206                )
207            if resp.url.path == "/private/linear/order/search":
208                self.order._onresponse(data["result"])
209            elif resp.url.path == "/private/linear/stop-order/search":
210                self.stoporder._onresponse(data["result"])
211            elif resp.url.path == "/private/linear/position/list":
212                self.position._onresponse(data["result"])
213            elif resp.url.path == "/public/linear/kline":
214                self.kline._onresponse(data["result"])
215            elif resp.url.path == "/v2/private/wallet/balance":
216                self.wallet._onresponse(data["result"])
217
218    def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:
219        if "success" in msg:
220            if not msg["success"]:
221                logger.warning(msg)
222        if "topic" in msg:
223            topic: str = msg["topic"]
224            data = msg["data"]
225            if any(
226                [
227                    topic.startswith("orderBookL2_25"),
228                    topic.startswith("orderBook_200"),
229                ]
230            ):
231                self.orderbook._onmessage(topic, msg["type"], data)
232            elif topic.startswith("trade"):
233                self.trade._onmessage(data)
234            elif topic.startswith("instrument_info"):
235                self.instrument._onmessage(topic, msg["type"], data)
236            if topic.startswith("candle"):
237                self.kline._onmessage(topic, data)
238            elif topic.startswith("liquidation"):
239                self.liquidation._onmessage(data)
240            elif topic == "position":
241                self.position._onmessage(data)
242            elif topic == "execution":
243                self.execution._onmessage(data)
244            elif topic == "order":
245                self.order._onmessage(data)
246            elif topic == "stop_order":
247                self.stoporder._onmessage(data)
248            elif topic == "wallet":
249                self.wallet._onmessage(data)
250        if "timestamp_e6" in msg:
251            self.timestamp_e6 = int(msg["timestamp_e6"])
252
253    @property
254    def orderbook(self) -> "OrderBookUSDT":
255        return self.get("orderbook", OrderBookUSDT)
256
257    @property
258    def trade(self) -> "TradeUSDT":
259        return self.get("trade", TradeUSDT)
260
261    @property
262    def instrument(self) -> "InstrumentUSDT":
263        return self.get("instrument", InstrumentUSDT)
264
265    @property
266    def kline(self) -> "KlineUSDT":
267        return self.get("kline", KlineUSDT)
268
269    @property
270    def liquidation(self) -> "LiquidationUSDT":
271        return self.get("liquidation", LiquidationUSDT)
272
273    @property
274    def position(self) -> "PositionUSDT":
275        """
276        USDT契約用のポジション
277        """
278        return self.get("position", PositionUSDT)
279
280    @property
281    def execution(self) -> "ExecutionUSDT":
282        return self.get("execution", ExecutionUSDT)
283
284    @property
285    def order(self) -> "OrderUSDT":
286        """
287        アクティブオーダーのみ(約定・キャンセル済みは削除される)
288        """
289        return self.get("order", OrderUSDT)
290
291    @property
292    def stoporder(self) -> "StopOrderUSDT":
293        """
294        アクティブオーダーのみ(トリガー済みは削除される)
295        """
296        return self.get("stoporder", StopOrderUSDT)
297
298    @property
299    def wallet(self) -> "WalletUSDT":
300        return self.get("wallet", WalletUSDT)
301
302
303class OrderBookInverse(DataStore):
304    _KEYS = ["symbol", "id", "side"]
305
306    def sorted(self, query: Optional[Item] = None) -> dict[str, list[Item]]:
307        if query is None:
308            query = {}
309        result = {"Sell": [], "Buy": []}
310        for item in self:
311            if all(k in item and query[k] == item[k] for k in query):
312                result[item["side"]].append(item)
313        result["Sell"].sort(key=lambda x: x["id"])
314        result["Buy"].sort(key=lambda x: x["id"], reverse=True)
315        return result
316
317    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:
318        if type_ == "snapshot":
319            symbol = topic.split(".")[-1]
320            # ex: "orderBookL2_25.BTCUSD", "orderBook_200.100ms.BTCUSD"
321            result = self.find({"symbol": symbol})
322            self._delete(result)
323            self._insert(data)
324        elif type_ == "delta":
325            self._delete(data["delete"])
326            self._update(data["update"])
327            self._insert(data["insert"])
328
329
330class OrderBookUSDT(OrderBookInverse):
331    def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:
332        if type_ == "snapshot":
333            symbol = topic.split(".")[-1]
334            # ex: "orderBookL2_25.BTCUSDT", "orderBook_200.100ms.BTCUSDT"
335            result = self.find({"symbol": symbol})
336            self._delete(result)
337            self._insert(data["order_book"])
338        elif type_ == "delta":
339            self._delete(data["delete"])
340            self._update(data["update"])
341            self._insert(data["insert"])
342
343
344class TradeInverse(DataStore):
345    _KEYS = ["trade_id"]
346    _MAXLEN = 99999
347
348    def _onmessage(self, data: list[Item]) -> None:
349        self._insert(data)
350
351
352class TradeUSDT(TradeInverse):
353    ...
354
355
356class Insurance(DataStore):
357    _KEYS = ["currency"]
358
359    def _onmessage(self, data: list[Item]) -> None:
360        self._update(data)
361
362
363class InstrumentInverse(DataStore):
364    _KEYS = ["symbol"]
365
366    def _onmessage(self, topic: str, type_: str, data: Item) -> None:
367        if type_ == "snapshot":
368            symbol = topic.split(".")[-1]  # ex: "instrument_info.100ms.BTCUSD"
369            result = self.find({"symbol": symbol})
370            self._delete(result)
371            self._insert([data])
372        elif type_ == "delta":
373            self._update(data["update"])
374
375
376class InstrumentUSDT(InstrumentInverse):
377    ...
378
379
380class KlineInverse(DataStore):
381    _KEYS = ["start", "symbol", "interval"]
382
383    def _onmessage(self, topic: str, data: list[Item]) -> None:
384        topic_split = topic.split(".")  # ex:"klineV2.1.BTCUSD"
385        for item in data:
386            item["symbol"] = topic_split[-1]
387            item["interval"] = topic_split[-2]
388        self._update(data)
389
390    def _onresponse(self, data: list[Item]) -> None:
391        for item in data:
392            item["start"] = item.pop("open_time")
393        self._update(data)
394
395
396class KlineUSDT(KlineInverse):
397    ...
398
399
400class LiquidationInverse(DataStore):
401    _MAXLEN = 99999
402
403    def _onmessage(self, item: Item) -> None:
404        self._insert([item])
405
406
407class LiquidationUSDT(LiquidationInverse):
408    ...
409
410
411class PositionInverse(DataStore):
412    _KEYS = ["symbol", "position_idx"]
413
414    def one(self, symbol: str) -> Optional[Item]:
415        return self.get({"symbol": symbol, "position_idx": 0})
416
417    def both(self, symbol: str) -> dict[str, Optional[Item]]:
418        return {
419            "Sell": self.get({"symbol": symbol, "position_idx": 2}),
420            "Buy": self.get({"symbol": symbol, "position_idx": 1}),
421        }
422
423    def _onresponse(self, data: Union[Item, list[Item]]) -> None:
424        if isinstance(data, dict):
425            self._update([data])  # ex: {"symbol": "BTCUSD", ...}
426        elif isinstance(data, list):
427            for item in data:
428                if "is_valid" in item:
429                    if item["is_valid"]:
430                        self._update([item["data"]])
431                        # ex:
432                        # [
433                        #     {
434                        #         "is_valid": True,
435                        #         "data": {"symbol": "BTCUSDM21", ...}
436                        #     },
437                        #     ...
438                        # ]
439                else:
440                    self._update([item])
441                    # ex: [{"symbol": "BTCUSDT", ...}, ...]
442
443    def _onmessage(self, data: list[Item]) -> None:
444        self._update(data)
445
446
447class PositionUSDT(PositionInverse):
448    def _onmessage(self, data: list[Item]) -> None:
449        for item in data:
450            item["position_idx"] = int(item["position_idx"])
451            self._update([item])
452
453
454class ExecutionInverse(DataStore):
455    _KEYS = ["exec_id"]
456
457    def _onmessage(self, data: list[Item]) -> None:
458        self._update(data)
459
460
461class ExecutionUSDT(ExecutionInverse):
462    ...
463
464
465class OrderInverse(DataStore):
466    _KEYS = ["order_id"]
467
468    def _onresponse(self, data: list[Item]) -> None:
469        if isinstance(data, list):
470            self._update(data)
471        elif isinstance(data, dict):
472            self._update([data])
473
474    def _onmessage(self, data: list[Item]) -> None:
475        for item in data:
476            if item["order_status"] in ("Created", "New", "PartiallyFilled"):
477                self._update([item])
478            else:
479                self._delete([item])
480
481
482class OrderUSDT(OrderInverse):
483    ...
484
485
486class StopOrderInverse(DataStore):
487    _KEYS = ["order_id"]
488
489    def _onresponse(self, data: list[Item]) -> None:
490        if isinstance(data, list):
491            self._update(data)
492        elif isinstance(data, dict):
493            self._update([data])
494
495    def _onmessage(self, data: list[Item]) -> None:
496        for item in data:
497            if item["order_status"] in ("Active", "Untriggered"):
498                self._update([item])
499            else:
500                self._delete([item])
501
502
503class StopOrderUSDT(StopOrderInverse):
504    _KEYS = ["stop_order_id"]
505
506
507class WalletInverse(DataStore):
508    _KEYS = ["coin"]
509
510    def _onresponse(self, data: dict[str, Item]) -> None:
511        data.pop("USDT", None)
512        for coin in data:
513            self._update(
514                [
515                    {
516                        "coin": coin,
517                        "available_balance": data[coin]["available_balance"],
518                        "wallet_balance": data[coin]["wallet_balance"],
519                    }
520                ]
521            )
522
523    def _onmessage(self, data: list[Item]) -> None:
524        self._update(data)
525
526
527class WalletUSDT(WalletInverse):
528    def _onresponse(self, data: dict[str, Item]) -> None:
529        if "USDT" in data:
530            self._update(
531                [
532                    {
533                        "coin": "USDT",
534                        "wallet_balance": data["USDT"]["wallet_balance"],
535                        "available_balance": data["USDT"]["available_balance"],
536                    }
537                ]
538            )
539
540    def _onmessage(self, data: list[Item]) -> None:
541        for item in data:
542            self._update([{"coin": "USDT", **item}])
543
Full Screen

Accelerate Your Automation Test Cycles With LambdaTest

Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.

Try LambdaTest

Run Python Tests on LambdaTest Cloud Grid

Execute automation tests with Playwright Python on a cloud-based Grid of 3000+ real browsers and operating systems for both web and mobile applications.

Test now for Free
LambdaTestX

We use cookies to give you the best experience. Cookies help to provide a more personalized experience and relevant advertising for you, and web analytics for us. Learn More in our Cookies policy, Privacy & Terms of service

Allow Cookie
Sarah

I hope you find the best code examples for your project.

If you want to accelerate automated browser testing, try LambdaTest. Your first 100 automation testing minutes are FREE.

Sarah Elson (Product & Growth Lead)