import json
import urllib.parse
from threading import Event
from typing import Any, Dict, List, Optional, Union
from requests import Response, Session
from websocket import WebSocket
class DataStore:
def __init__(self) -> None:
self.orderbook = OrderBook()
self.trade = Trade()
self.insurance = Insurance()
self.instrument = Instrument()
self.kline = Kline()
self.position = Position()
self.execution = Execution()
self.order = Order()
self.stoporder = StopOrder()
self.wallet = Wallet()
self._events: List[Event] = []
def onresponse(self, resp: Response, session: Session) -> None:
content: Dict[str, Any] = resp.json()
if content.get('ret_code') == 0:
# order
if any([
resp.request.path_url.startswith('/v2/private/order'),
resp.request.path_url.startswith('/private/linear/order/search'),
resp.request.path_url.startswith('/futures/private/order'),
]):
if isinstance(content['result'], list):
self.order._onresponse(content['result'])
# stoporder
elif any([
resp.request.path_url.startswith('/v2/private/stop-order'),
resp.request.path_url.startswith('/private/linear/order/search'),
resp.request.path_url.startswith('/futures/private/order'),
]):
if isinstance(content['result'], list):
self.stoporder._onresponse(content['result'])
# position
elif any([
resp.request.path_url.startswith('/v2/private/position/list'),
resp.request.path_url.startswith('/futures/private/position/list'),
]):
self.position.inverse._onresponse(content['result'])
elif resp.request.path_url.startswith('/private/linear/position/list'):
self.position.linear._onresponse(content['result'])
# wallet
elif resp.request.path_url.startswith('/v2/private/wallet/balance'):
self.wallet._onresponse(content['result'])
def onmessage(self, msg: str, ws: WebSocket) -> None:
content: Dict[str, Any] = json.loads(msg)
if 'topic' in content:
topic: str = content['topic']
data: Union[List[Item], Item] = content['data']
type_: Optional[str] = content.get('type')
if any([
topic.startswith('orderBookL2_25'),
topic.startswith('orderBook_200'),
]):
self.orderbook._onmessage(type_, data)
elif topic.startswith('trade'):
self.trade._onmessage(data)
elif topic.startswith('insurance'):
self.insurance._onmessage(data)
elif topic.startswith('instrument_info'):
self.instrument._onmessage(type_, data)
if any([
topic.startswith('klineV2'),
topic.startswith('candle'),
]):
self.kline._onmessage(topic, data)
elif topic == 'position':
self.position._onmessage(data)
self.wallet._onposition(data)
elif topic == 'execution':
self.execution._onmessage(data)
elif topic == 'order':
self.order._onmessage(data)
elif topic == 'stop_order':
self.stoporder._onmessage(data)
elif topic == 'wallet':
self.wallet._onmessage(data)
for event in self._events:
event.set()
self._events.clear()
def wait(self) -> None:
event = Event()
self._events.append(event)
event.wait()
class DefaultDataStore(DataStore): ...
Item = Dict[str, Any]
class _KeyValueStore:
_KEYS: List[str]
_MAXLEN: Optional[int]
def __init__(self) -> None:
self._data: Dict[str, Item] = {}
self._events: List[Event] = []
def get(self, **kwargs) -> Optional[Item]:
try:
dumps = self._dumps(kwargs)
if dumps in self._data:
return self._data[dumps]
except KeyError:
if kwargs:
for item in self._data.values():
for k, v, in kwargs.items():
if not k in item:
break
if v != item[k]:
break
else:
return item
else:
for item in self._data.values():
return item
def getlist(self, **kwargs) -> List[Item]:
if kwargs:
result = []
for item in self._data.values():
for k, v in kwargs.items():
if not k in item:
break
if v != item[k]:
break
else:
result.append(item)
return result
else:
return list(self._data.values())
def __len__(self):
return len(self._data)
def _dumps(self, item: Item) -> str:
keyitem = {k: item[k] for k in self._KEYS}
return urllib.parse.urlencode(keyitem)
def _update(self, items: List[Item]) -> None:
for item in items:
try:
key = self._dumps(item)
if key in self._data:
self._data[key].update(item)
else:
self._data[key] = item
except KeyError:
pass
if self._MAXLEN is not None:
len_data = len(self._data)
if len_data > self._MAXLEN:
over = len_data - self._MAXLEN
keys = []
for i, k in enumerate(self._data.keys()):
if i < over:
keys.append(k)
else:
break
for k in keys:
self._data.pop(k)
for event in self._events:
event.set()
self._events.clear()
def _pop(self, items: List[Item]) -> None:
for item in items:
try:
key = self._dumps(item)
if key in self._data:
self._data.pop(key)
except KeyError:
pass
for event in self._events:
event.set()
self._events.clear()
def wait(self) -> None:
event = Event()
self._events.append(event)
event.wait()
class OrderBook(_KeyValueStore):
_KEYS = ['symbol', 'id', 'side']
_MAXLEN = None
def getbest(self, symbol: str) -> Dict[str, Optional[Item]]:
result = {'Sell': {}, 'Buy': {}}
for item in self._data.values():
if item['symbol'] == symbol:
result[item['side']][float(item['price'])] = item
return {
'Sell': result['Sell'][min(result['Sell'])] if result['Sell'] else None,
'Buy': result['Buy'][max(result['Buy'])] if result['Buy'] else None
}
def getsorted(self, symbol: str) -> Dict[str, List[Item]]:
result = {'Sell': [], 'Buy': []}
for item in self._data.values():
if item['symbol'] == symbol:
result[item['side']].append(item)
return {
'Sell': sorted(result['Sell'], key=lambda x: float(x['price'])),
'Buy': sorted(result['Buy'], key=lambda x: float(x['price']), reverse=True)
}
def _onmessage(self, type_: str, data: Union[List[Item], Item]) -> None:
if type_ == 'snapshot':
if isinstance(data, dict):
data = data['order_book']
self._update(data)
elif type_ == 'delta':
self._pop(data['delete'])
self._update(data['update'])
self._update(data['insert'])
class Trade(_KeyValueStore):
_KEYS = ['trade_id']
_MAXLEN = 10000
def _onmessage(self, data: List[Item]) -> None:
self._update(data)
class Insurance(_KeyValueStore):
_KEYS = ['currency']
_MAXLEN = None
def _onmessage(self, data: List[Item]) -> None:
self._update(data)
class Instrument(_KeyValueStore):
_KEYS = ['symbol']
_MAXLEN = None
def _onmessage(self, type_: str, data: Item) -> None:
if type_ == 'snapshot':
self._update([data])
elif type_ == 'delta':
self._update(data['update'])
class Kline(_KeyValueStore):
_KEYS = ['symbol', 'start']
_MAXLEN = 5000
def _onmessage(self, topic: str, data: List[Item]) -> None:
symbol = topic.split('.')[2] # ex:'klineV2.1.BTCUSD'
for item in data:
item['symbol'] = symbol
self._update(data)
class Position:
def __init__(self):
self.inverse = PositionInverse()
self.linear = PositionLinear()
def _onmessage(self, data: List[Item]) -> None:
if len(data):
symbol: str = data[0]['symbol']
if symbol.endswith('USDT'):
self.linear._onmessage(data)
else:
self.inverse._onmessage(data)
class PositionInverse(_KeyValueStore):
_KEYS = ['symbol', 'position_idx']
_MAXLEN = None
def getone(self, symbol: str) -> Optional[Item]:
return self.get(symbol=symbol, position_idx=0)
def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:
return {
'Sell': self.get(symbol=symbol, position_idx=2),
'Buy': self.get(symbol=symbol, position_idx=1),
}
def _onresponse(self, data: Union[Item, List[Item]]) -> None:
if isinstance(data, dict):
self._update([data])
elif isinstance(data, list):
if len(data) and 'data' in data[0]:
self._update([item['data'] for item in data])
else:
self._update(data)
def _onmessage(self, data: List[Item]) -> None:
self._update(data)
class PositionLinear(_KeyValueStore):
_KEYS = ['symbol', 'side']
_MAXLEN = None
def getboth(self, symbol: str) -> Dict[str, Optional[Item]]:
return {
'Sell': self.get(symbol=symbol, side='Sell'),
'Buy': self.get(symbol=symbol, side='Buy'),
}
def _onresponse(self, data: List[Item]) -> None:
if len(data) and 'data' in data[0]:
self._update([item['data'] for item in data])
else:
self._update(data)
def _onmessage(self, data: List[Item]) -> None:
self._update(data)
class Execution(_KeyValueStore):
_KEYS = ['exec_id']
_MAXLEN = 5000
def _onmessage(self, data: List[Item]) -> None:
self._update(data)
class Order(_KeyValueStore):
_KEYS = ['order_id']
_MAXLEN = None
def _onresponse(self, data: List[Item]) -> None:
self._update(data)
def _onmessage(self, data: List[Item]) -> None:
for item in data:
if item['order_status'] in ('Created', 'New', 'PartiallyFilled', ):
self._update([item])
else:
self._pop([item])
class StopOrder(_KeyValueStore):
_KEYS = ['stop_order_id']
_MAXLEN = None
def _onresponse(self, data: List[Item]) -> None:
self._update(data)
def _onmessage(self, data: List[Item]) -> None:
for item in data:
if 'order_id' in item:
item['stop_order_id'] = item.pop('order_id')
if 'order_status' in item:
item['stop_order_status'] = item.pop('order_status')
if item['stop_order_status'] in ('Active', 'Untriggered', ):
self._update([item])
else:
self._pop([item])
class Wallet(_KeyValueStore):
_KEYS = ['coin']
_MAXLEN = None
def _onresponse(self, data: Dict[str, Item]) -> None:
for coin, item in data.items():
_item = {}
_item['coin'] = coin
_item['wallet_balance'] = item['wallet_balance']
_item['available_balance'] = item['available_balance']
self._update([_item])
def _onposition(self, data: List[Item]) -> None:
if len(data) and 'position_idx' in data[0]:
for item in data:
_item = {}
symbol: str = item['symbol']
if symbol.endswith('USD'):
_item['coin'] = symbol[:-3] # ex:'BTCUSD'
else:
_item['coin'] = symbol[:-6] # ex:'BTCUSDM21'
_item['wallet_balance'] = item['wallet_balance']
_item['available_balance'] = item['available_balance']
self._update([_item])
def _onmessage(self, data: List[Item]) -> None:
for item in data:
_item = {}
_item['coin'] = 'USDT'
_item['wallet_balance'] = item['wallet_balance']
_item['available_balance'] = item['available_balance']
self._update([item])
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import ddt
import mock
from oslo_utils import timeutils
from oslo_utils import uuidutils
from zaqar.common import consts
from zaqar.tests.unit.transport.websocket import base
from zaqar.tests.unit.transport.websocket import utils as test_utils
@ddt.ddt
class ClaimsBaseTest(base.V1_1Base):
config_file = "websocket_mongodb.conf"
def setUp(self):
super(ClaimsBaseTest, self).setUp()
self.protocol = self.transport.factory()
self.defaults = self.api.get_defaults()
self.project_id = '7e55e1a7e'
self.headers = {
'Client-ID': uuidutils.generate_uuid(),
'X-Project-ID': self.project_id
}
action = consts.QUEUE_CREATE
body = {"queue_name": "skittle"}
req = test_utils.create_request(action, body, self.headers)
with mock.patch.object(self.protocol, 'sendMessage') as msg_mock:
self.protocol.onMessage(req, False)
resp = json.loads(msg_mock.call_args[0][0].decode())
self.assertIn(resp['headers']['status'], [201, 204])
action = consts.MESSAGE_POST
body = {"queue_name": "skittle",
"messages": [
{'body': 239, 'ttl': 300},
{'body': {'key_1': 'value_1'}, 'ttl': 300},
{'body': [1, 3], 'ttl': 300},
{'body': 439, 'ttl': 300},
{'body': {'key_2': 'value_2'}, 'ttl': 300},
{'body': ['a', 'b'], 'ttl': 300},
{'body': 639, 'ttl': 300},
{'body': {'key_3': 'value_3'}, 'ttl': 300},
{'body': ["aa", "bb"], 'ttl': 300}]
}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(201, resp['headers']['status'])
def tearDown(self):
super(ClaimsBaseTest, self).tearDown()
action = consts.QUEUE_DELETE
body = {'queue_name': 'skittle'}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
@ddt.data('[', '[]', '.', '"fail"')
def test_bad_claim(self, doc):
action = consts.CLAIM_CREATE
body = doc
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
action = consts.CLAIM_UPDATE
body = doc
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
def test_exceeded_claim(self):
action = consts.CLAIM_CREATE
body = {"queue_name": "skittle",
"ttl": 100,
"grace": 60,
"limit": 21}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
@ddt.data((-1, -1), (59, 60), (60, 59), (60, 43201), (43201, 60))
def test_unacceptable_ttl_or_grace(self, ttl_grace):
ttl, grace = ttl_grace
action = consts.CLAIM_CREATE
body = {"queue_name": "skittle",
"ttl": ttl,
"grace": grace}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
@ddt.data(-1, 59, 43201)
def test_unacceptable_new_ttl(self, ttl):
claim = self._get_a_claim()
action = consts.CLAIM_UPDATE
body = {"queue_name": "skittle",
"claim_id": claim['body']['claim_id'],
"ttl": ttl}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
def test_default_ttl_and_grace(self):
action = consts.CLAIM_CREATE
body = {"queue_name": "skittle"}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(201, resp['headers']['status'])
action = consts.CLAIM_GET
body = {"queue_name": "skittle",
"claim_id": resp['body']['claim_id']}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual(self.defaults.claim_ttl, resp['body']['ttl'])
def test_lifecycle(self):
# First, claim some messages
action = consts.CLAIM_CREATE
body = {"queue_name": "skittle",
"ttl": 100,
"grace": 60}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(201, resp['headers']['status'])
claimed_messages = resp['body']['messages']
claim_id = resp['body']['claim_id']
# No more messages to claim
body = {"queue_name": "skittle",
"ttl": 100,
"grace": 60}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
# Listing messages, by default, won't include claimed, will echo
action = consts.MESSAGE_LIST
body = {"queue_name": "skittle",
"echo": True}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual([], resp['body']['messages'])
# Listing messages, by default, won't include claimed, won't echo
body = {"queue_name": "skittle",
"echo": False}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual([], resp['body']['messages'])
# List messages, include_claimed, but don't echo
body = {"queue_name": "skittle",
"include_claimed": True,
"echo": False}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual(resp['body']['messages'], [])
# List messages with a different client-id and echo=false.
# Should return some messages
body = {"queue_name": "skittle",
"echo": False}
headers = {
'Client-ID': uuidutils.generate_uuid(),
'X-Project-ID': self.project_id
}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
# Include claimed messages this time, and echo
body = {"queue_name": "skittle",
"include_claimed": True,
"echo": True}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual(len(claimed_messages), len(resp['body']['messages']))
message_id_1 = resp['body']['messages'][0]['id']
message_id_2 = resp['body']['messages'][1]['id']
# Try to delete the message without submitting a claim_id
action = consts.MESSAGE_DELETE
body = {"queue_name": "skittle",
"message_id": message_id_1}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(403, resp['headers']['status'])
# Delete the message and its associated claim
body = {"queue_name": "skittle",
"message_id": message_id_1,
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
# Try to get it from the wrong project
headers = {
'Client-ID': uuidutils.generate_uuid(),
'X-Project-ID': 'someproject'
}
action = consts.MESSAGE_GET
body = {"queue_name": "skittle",
"message_id": message_id_2}
req = test_utils.create_request(action, body, headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(404, resp['headers']['status'])
# Get the message
action = consts.MESSAGE_GET
body = {"queue_name": "skittle",
"message_id": message_id_2}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
# Update the claim
creation = timeutils.utcnow()
action = consts.CLAIM_UPDATE
body = {"queue_name": "skittle",
"ttl": 60,
"grace": 60,
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
# Get the claimed messages (again)
action = consts.CLAIM_GET
body = {"queue_name": "skittle",
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
query = timeutils.utcnow()
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
self.assertEqual(60, resp['body']['ttl'])
message_id_3 = resp['body']['messages'][0]['id']
estimated_age = timeutils.delta_seconds(creation, query)
# The claim's age should be 0 at this moment. But in some unexpected
# case, such as slow test, the age maybe larger than 0. Just skip
# asserting if so.
if resp['body']['age'] == 0:
self.assertGreater(estimated_age, resp['body']['age'])
# Delete the claim
action = consts.CLAIM_DELETE
body = {"queue_name": "skittle",
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
# Try to delete a message with an invalid claim ID
action = consts.MESSAGE_DELETE
body = {"queue_name": "skittle",
"message_id": message_id_3,
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(400, resp['headers']['status'])
# Make sure it wasn't deleted!
action = consts.MESSAGE_GET
body = {"queue_name": "skittle",
"message_id": message_id_2}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(200, resp['headers']['status'])
# Try to get a claim that doesn't exist
action = consts.CLAIM_GET
body = {"queue_name": "skittle",
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(404, resp['headers']['status'])
# Try to update a claim that doesn't exist
action = consts.CLAIM_UPDATE
body = {"queue_name": "skittle",
"ttl": 60,
"grace": 60,
"claim_id": claim_id}
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(404, resp['headers']['status'])
def test_post_claim_nonexistent_queue(self):
action = consts.CLAIM_CREATE
body = {"queue_name": "nonexistent",
"ttl": 100,
"grace": 60}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(204, resp['headers']['status'])
def test_get_claim_nonexistent_queue(self):
action = consts.CLAIM_GET
body = {"queue_name": "nonexistent",
"claim_id": "aaabbbba"}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(404, resp['headers']['status'])
def _get_a_claim(self):
action = consts.CLAIM_CREATE
body = {"queue_name": "skittle",
"ttl": 100,
"grace": 60}
send_mock = mock.Mock()
self.protocol.sendMessage = send_mock
req = test_utils.create_request(action, body, self.headers)
self.protocol.onMessage(req, False)
resp = json.loads(send_mock.call_args[0][0].decode())
self.assertEqual(201, resp['headers']['status'])
return resp
from __future__ import annotations
import asyncio
import logging
from typing import Awaitable, Optional, Union
import aiohttp
from ..store import DataStore, DataStoreManager
from ..typedefs import Item
from ..ws import ClientWebSocketResponse
logger = logging.getLogger(__name__)
class BybitInverseDataStore(DataStoreManager):
"""
Bybit Inverseå¥ç´ã®ãã¼ã¿ã¹ãã¢ããã¼ã¸ã£ã¼
"""
def _init(self) -> None:
self.create("orderbook", datastore_class=OrderBookInverse)
self.create("trade", datastore_class=TradeInverse)
self.create("insurance", datastore_class=Insurance)
self.create("instrument", datastore_class=InstrumentInverse)
self.create("kline", datastore_class=KlineInverse)
self.create("liquidation", datastore_class=LiquidationInverse)
self.create("position", datastore_class=PositionInverse)
self.create("execution", datastore_class=ExecutionInverse)
self.create("order", datastore_class=OrderInverse)
self.create("stoporder", datastore_class=StopOrderInverse)
self.create("wallet", datastore_class=WalletInverse)
self.timestamp_e6: Optional[int] = None
async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:
"""
対å¿ã¨ã³ããã¤ã³ã
- GET /v2/private/order (DataStore: order)
- GET /futures/private/order (DataStore: order)
- GET /v2/private/stop-order (DataStore: stoporder)
- GET /futures/private/stop-order (DataStore: stoporder)
- GET /v2/private/position/list (DataStore: position)
- GET /futures/private/position/list (DataStore: position)
- GET /v2/private/wallet/balance (DataStore: wallet)
- GET /v2/public/kline/list (DataStore: kline)
"""
for f in asyncio.as_completed(aws):
resp = await f
data = await resp.json()
if data["ret_code"] != 0:
raise ValueError(
"Response error at DataStore initialization\n"
f"URL: {resp.url}\n"
f"Data: {data}"
)
if resp.url.path in (
"/v2/private/order",
"/futures/private/order",
):
self.order._onresponse(data["result"])
elif resp.url.path in (
"/v2/private/stop-order",
"/futures/private/stop-order",
):
self.stoporder._onresponse(data["result"])
elif resp.url.path in (
"/v2/private/position/list",
"/futures/private/position/list",
):
self.position._onresponse(data["result"])
elif resp.url.path == "/v2/public/kline/list":
self.kline._onresponse(data["result"])
elif resp.url.path == "/v2/private/wallet/balance":
self.wallet._onresponse(data["result"])
def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:
if "success" in msg:
if not msg["success"]:
logger.warning(msg)
if "topic" in msg:
topic: str = msg["topic"]
data = msg["data"]
if any(
[
topic.startswith("orderBookL2_25"),
topic.startswith("orderBook_200"),
]
):
self.orderbook._onmessage(topic, msg["type"], data)
elif topic.startswith("trade"):
self.trade._onmessage(data)
elif topic.startswith("insurance"):
self.insurance._onmessage(data)
elif topic.startswith("instrument_info"):
self.instrument._onmessage(topic, msg["type"], data)
if topic.startswith("klineV2"):
self.kline._onmessage(topic, data)
elif topic.startswith("liquidation"):
self.liquidation._onmessage(data)
elif topic == "position":
self.position._onmessage(data)
elif topic == "execution":
self.execution._onmessage(data)
elif topic == "order":
self.order._onmessage(data)
elif topic == "stop_order":
self.stoporder._onmessage(data)
elif topic == "wallet":
self.wallet._onmessage(data)
if "timestamp_e6" in msg:
self.timestamp_e6 = int(msg["timestamp_e6"])
@property
def orderbook(self) -> "OrderBookInverse":
return self.get("orderbook", OrderBookInverse)
@property
def trade(self) -> "TradeInverse":
return self.get("trade", TradeInverse)
@property
def insurance(self) -> "Insurance":
return self.get("insurance", Insurance)
@property
def instrument(self) -> "InstrumentInverse":
return self.get("instrument", InstrumentInverse)
@property
def kline(self) -> "KlineInverse":
return self.get("kline", KlineInverse)
@property
def liquidation(self) -> "LiquidationInverse":
return self.get("liquidation", LiquidationInverse)
@property
def position(self) -> "PositionInverse":
"""
ã¤ã³ãã¼ã¹å¥ç´(ç¡æé/å
ç©)ç¨ã®ãã¸ã·ã§ã³
"""
return self.get("position", PositionInverse)
@property
def execution(self) -> "ExecutionInverse":
return self.get("execution", ExecutionInverse)
@property
def order(self) -> "OrderInverse":
"""
ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)
"""
return self.get("order", OrderInverse)
@property
def stoporder(self) -> "StopOrderInverse":
"""
ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)
"""
return self.get("stoporder", StopOrderInverse)
@property
def wallet(self) -> "WalletInverse":
return self.get("wallet", WalletInverse)
class BybitUSDTDataStore(DataStoreManager):
"""
Bybit USDTå¥ç´ã®ãã¼ã¿ã¹ãã¢ããã¼ã¸ã£ã¼
"""
def _init(self) -> None:
self.create("orderbook", datastore_class=OrderBookUSDT)
self.create("trade", datastore_class=TradeUSDT)
self.create("insurance", datastore_class=Insurance)
self.create("instrument", datastore_class=InstrumentUSDT)
self.create("kline", datastore_class=KlineUSDT)
self.create("liquidation", datastore_class=LiquidationUSDT)
self.create("position", datastore_class=PositionUSDT)
self.create("execution", datastore_class=ExecutionUSDT)
self.create("order", datastore_class=OrderUSDT)
self.create("stoporder", datastore_class=StopOrderUSDT)
self.create("wallet", datastore_class=WalletUSDT)
self.timestamp_e6: Optional[int] = None
async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:
"""
対å¿ã¨ã³ããã¤ã³ã
- GET /private/linear/order/search (DataStore: order)
- GET /private/linear/stop-order/search (DataStore: stoporder)
- GET /private/linear/position/list (DataStore: position)
- GET /private/linear/position/list (DataStore: position)
- GET /public/linear/kline (DataStore: kline)
- GET /v2/private/wallet/balance (DataStore: wallet)
"""
for f in asyncio.as_completed(aws):
resp = await f
data = await resp.json()
if data["ret_code"] != 0:
raise ValueError(
"Response error at DataStore initialization\n"
f"URL: {resp.url}\n"
f"Data: {data}"
)
if resp.url.path == "/private/linear/order/search":
self.order._onresponse(data["result"])
elif resp.url.path == "/private/linear/stop-order/search":
self.stoporder._onresponse(data["result"])
elif resp.url.path == "/private/linear/position/list":
self.position._onresponse(data["result"])
elif resp.url.path == "/public/linear/kline":
self.kline._onresponse(data["result"])
elif resp.url.path == "/v2/private/wallet/balance":
self.wallet._onresponse(data["result"])
def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:
if "success" in msg:
if not msg["success"]:
logger.warning(msg)
if "topic" in msg:
topic: str = msg["topic"]
data = msg["data"]
if any(
[
topic.startswith("orderBookL2_25"),
topic.startswith("orderBook_200"),
]
):
self.orderbook._onmessage(topic, msg["type"], data)
elif topic.startswith("trade"):
self.trade._onmessage(data)
elif topic.startswith("instrument_info"):
self.instrument._onmessage(topic, msg["type"], data)
if topic.startswith("candle"):
self.kline._onmessage(topic, data)
elif topic.startswith("liquidation"):
self.liquidation._onmessage(data)
elif topic == "position":
self.position._onmessage(data)
elif topic == "execution":
self.execution._onmessage(data)
elif topic == "order":
self.order._onmessage(data)
elif topic == "stop_order":
self.stoporder._onmessage(data)
elif topic == "wallet":
self.wallet._onmessage(data)
if "timestamp_e6" in msg:
self.timestamp_e6 = int(msg["timestamp_e6"])
@property
def orderbook(self) -> "OrderBookUSDT":
return self.get("orderbook", OrderBookUSDT)
@property
def trade(self) -> "TradeUSDT":
return self.get("trade", TradeUSDT)
@property
def instrument(self) -> "InstrumentUSDT":
return self.get("instrument", InstrumentUSDT)
@property
def kline(self) -> "KlineUSDT":
return self.get("kline", KlineUSDT)
@property
def liquidation(self) -> "LiquidationUSDT":
return self.get("liquidation", LiquidationUSDT)
@property
def position(self) -> "PositionUSDT":
"""
USDTå¥ç´ç¨ã®ãã¸ã·ã§ã³
"""
return self.get("position", PositionUSDT)
@property
def execution(self) -> "ExecutionUSDT":
return self.get("execution", ExecutionUSDT)
@property
def order(self) -> "OrderUSDT":
"""
ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ç´å®ã»ãã£ã³ã»ã«æ¸ã¿ã¯åé¤ããã)
"""
return self.get("order", OrderUSDT)
@property
def stoporder(self) -> "StopOrderUSDT":
"""
ã¢ã¯ãã£ããªã¼ãã¼ã®ã¿(ããªã¬ã¼æ¸ã¿ã¯åé¤ããã)
"""
return self.get("stoporder", StopOrderUSDT)
@property
def wallet(self) -> "WalletUSDT":
return self.get("wallet", WalletUSDT)
class OrderBookInverse(DataStore):
_KEYS = ["symbol", "id", "side"]
def sorted(self, query: Optional[Item] = None) -> dict[str, list[Item]]:
if query is None:
query = {}
result = {"Sell": [], "Buy": []}
for item in self:
if all(k in item and query[k] == item[k] for k in query):
result[item["side"]].append(item)
result["Sell"].sort(key=lambda x: x["id"])
result["Buy"].sort(key=lambda x: x["id"], reverse=True)
return result
def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:
if type_ == "snapshot":
symbol = topic.split(".")[-1]
# ex: "orderBookL2_25.BTCUSD", "orderBook_200.100ms.BTCUSD"
result = self.find({"symbol": symbol})
self._delete(result)
self._insert(data)
elif type_ == "delta":
self._delete(data["delete"])
self._update(data["update"])
self._insert(data["insert"])
class OrderBookUSDT(OrderBookInverse):
def _onmessage(self, topic: str, type_: str, data: Union[list[Item], Item]) -> None:
if type_ == "snapshot":
symbol = topic.split(".")[-1]
# ex: "orderBookL2_25.BTCUSDT", "orderBook_200.100ms.BTCUSDT"
result = self.find({"symbol": symbol})
self._delete(result)
self._insert(data["order_book"])
elif type_ == "delta":
self._delete(data["delete"])
self._update(data["update"])
self._insert(data["insert"])
class TradeInverse(DataStore):
_KEYS = ["trade_id"]
_MAXLEN = 99999
def _onmessage(self, data: list[Item]) -> None:
self._insert(data)
class TradeUSDT(TradeInverse):
...
class Insurance(DataStore):
_KEYS = ["currency"]
def _onmessage(self, data: list[Item]) -> None:
self._update(data)
class InstrumentInverse(DataStore):
_KEYS = ["symbol"]
def _onmessage(self, topic: str, type_: str, data: Item) -> None:
if type_ == "snapshot":
symbol = topic.split(".")[-1] # ex: "instrument_info.100ms.BTCUSD"
result = self.find({"symbol": symbol})
self._delete(result)
self._insert([data])
elif type_ == "delta":
self._update(data["update"])
class InstrumentUSDT(InstrumentInverse):
...
class KlineInverse(DataStore):
_KEYS = ["start", "symbol", "interval"]
def _onmessage(self, topic: str, data: list[Item]) -> None:
topic_split = topic.split(".") # ex:"klineV2.1.BTCUSD"
for item in data:
item["symbol"] = topic_split[-1]
item["interval"] = topic_split[-2]
self._update(data)
def _onresponse(self, data: list[Item]) -> None:
for item in data:
item["start"] = item.pop("open_time")
self._update(data)
class KlineUSDT(KlineInverse):
...
class LiquidationInverse(DataStore):
_MAXLEN = 99999
def _onmessage(self, item: Item) -> None:
self._insert([item])
class LiquidationUSDT(LiquidationInverse):
...
class PositionInverse(DataStore):
_KEYS = ["symbol", "position_idx"]
def one(self, symbol: str) -> Optional[Item]:
return self.get({"symbol": symbol, "position_idx": 0})
def both(self, symbol: str) -> dict[str, Optional[Item]]:
return {
"Sell": self.get({"symbol": symbol, "position_idx": 2}),
"Buy": self.get({"symbol": symbol, "position_idx": 1}),
}
def _onresponse(self, data: Union[Item, list[Item]]) -> None:
if isinstance(data, dict):
self._update([data]) # ex: {"symbol": "BTCUSD", ...}
elif isinstance(data, list):
for item in data:
if "is_valid" in item:
if item["is_valid"]:
self._update([item["data"]])
# ex:
# [
# {
# "is_valid": True,
# "data": {"symbol": "BTCUSDM21", ...}
# },
# ...
# ]
else:
self._update([item])
# ex: [{"symbol": "BTCUSDT", ...}, ...]
def _onmessage(self, data: list[Item]) -> None:
self._update(data)
class PositionUSDT(PositionInverse):
def _onmessage(self, data: list[Item]) -> None:
for item in data:
item["position_idx"] = int(item["position_idx"])
self._update([item])
class ExecutionInverse(DataStore):
_KEYS = ["exec_id"]
def _onmessage(self, data: list[Item]) -> None:
self._update(data)
class ExecutionUSDT(ExecutionInverse):
...
class OrderInverse(DataStore):
_KEYS = ["order_id"]
def _onresponse(self, data: list[Item]) -> None:
if isinstance(data, list):
self._update(data)
elif isinstance(data, dict):
self._update([data])
def _onmessage(self, data: list[Item]) -> None:
for item in data:
if item["order_status"] in ("Created", "New", "PartiallyFilled"):
self._update([item])
else:
self._delete([item])
class OrderUSDT(OrderInverse):
...
class StopOrderInverse(DataStore):
_KEYS = ["order_id"]
def _onresponse(self, data: list[Item]) -> None:
if isinstance(data, list):
self._update(data)
elif isinstance(data, dict):
self._update([data])
def _onmessage(self, data: list[Item]) -> None:
for item in data:
if item["order_status"] in ("Active", "Untriggered"):
self._update([item])
else:
self._delete([item])
class StopOrderUSDT(StopOrderInverse):
_KEYS = ["stop_order_id"]
class WalletInverse(DataStore):
_KEYS = ["coin"]
def _onresponse(self, data: dict[str, Item]) -> None:
data.pop("USDT", None)
for coin in data:
self._update(
[
{
"coin": coin,
"available_balance": data[coin]["available_balance"],
"wallet_balance": data[coin]["wallet_balance"],
}
]
)
def _onmessage(self, data: list[Item]) -> None:
self._update(data)
class WalletUSDT(WalletInverse):
def _onresponse(self, data: dict[str, Item]) -> None:
if "USDT" in data:
self._update(
[
{
"coin": "USDT",
"wallet_balance": data["USDT"]["wallet_balance"],
"available_balance": data["USDT"]["available_balance"],
}
]
)
def _onmessage(self, data: list[Item]) -> None:
for item in data:
self._update([{"coin": "USDT", **item}])
Accelerate Your Automation Test Cycles With LambdaTest
Leverage LambdaTest’s cloud-based platform to execute your automation tests in parallel and trim down your test execution time significantly. Your first 100 automation testing minutes are on us.