Best Python code snippet using avocado_python
test_coinflex_exchange.py
Source:test_coinflex_exchange.py  
...107            await asyncio.sleep(1.0)108        return future.result()109    def run_parallel(self, *tasks):110        return self.ev_loop.run_until_complete(self.run_parallel_async(*tasks))111    async def _async_run_with_timeout(self, coro, seconds):112        async with timeout(seconds):113            return await coro114    def _run_with_timeout(self, coro, seconds=20):115        return self.ev_loop.run_until_complete(self._async_run_with_timeout(coro, seconds))116    def _place_order(self, is_buy, amount, order_type, price, ex_order_id) -> str:117        if is_buy:118            cl_order_id = self.connector.buy(self.trading_pair, amount, order_type, price)119        else:120            cl_order_id = self.connector.sell(self.trading_pair, amount, order_type, price)121        return cl_order_id122    def _cancel_order(self, cl_order_id, connector=None):123        if connector is None:124            connector = self.connector125        return connector.cancel(self.trading_pair, cl_order_id)126    def test_estimate_fee(self):127        maker_fee = self.connector.estimate_fee_pct(True)128        self.assertAlmostEqual(maker_fee, Decimal("0.000"))129        taker_fee = self.connector.estimate_fee_pct(False)130        self.assertAlmostEqual(taker_fee, Decimal("0.0008"))131    def test_update_last_prices(self):132        # This is basic test to see if order_book last_trade_price is initiated and updated.133        for order_book in self.connector.order_books.values():134            for _ in range(5):135                self.ev_loop.run_until_complete(asyncio.sleep(1))136                self.assertFalse(math.isnan(order_book.last_trade_price))137    def test_cancel_all(self):138        bid_price = self.connector.get_price(self.trading_pair, True)139        ask_price = self.connector.get_price(self.trading_pair, False)140        bid_price = self.connector.quantize_order_price(self.trading_pair, bid_price * Decimal("0.9"))141        ask_price = self.connector.quantize_order_price(self.trading_pair, ask_price * Decimal("1.1"))142        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))143        buy_id = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1)144        sell_id = self._place_order(False, amount, OrderType.LIMIT, ask_price, 2)145        self.ev_loop.run_until_complete(asyncio.sleep(1))146        asyncio.ensure_future(self.connector.cancel_all(60))147        self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))148        self.ev_loop.run_until_complete(asyncio.sleep(1))149        cancel_events = [t for t in self.event_logger.event_log if isinstance(t, OrderCancelledEvent)]150        self.assertEqual({buy_id, sell_id}, {o.order_id for o in cancel_events})151    def test_buy_and_sell(self):152        print(self.connector.get_price(self.trading_pair, True))153        price = self.connector.get_mid_price(self.trading_pair) * Decimal("1.05")154        price = self.connector.quantize_order_price(self.trading_pair, price)155        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))156        quote_bal = self.connector.get_available_balance(self.quote_token)157        order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1)158        order_completed_event = self._run_with_timeout(self.event_logger.wait_for(BuyOrderCompletedEvent))159        self.ev_loop.run_until_complete(asyncio.sleep(5))160        trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)]161        base_amount_traded = sum(t.amount for t in trade_events)162        quote_amount_traded = sum(t.amount * t.price for t in trade_events)163        self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events])164        self.assertEqual(order_id, order_completed_event.order_id)165        self.assertEqual(amount, order_completed_event.base_asset_amount)166        self.assertEqual("BTC", order_completed_event.base_asset)167        self.assertEqual("USD", order_completed_event.quote_asset)168        self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount)169        self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount)170        self.assertGreater(order_completed_event.fee_amount, Decimal(0))171        self.assertTrue(any([isinstance(event, BuyOrderCreatedEvent) and str(event.order_id) == str(order_id)172                             for event in self.event_logger.event_log]))173        # check available quote balance gets updated, we need to wait a bit for the balance message to arrive174        expected_quote_bal = quote_bal - quote_amount_traded175        expected_quote_bal_self = quote_bal - order_completed_event.fee_amount176        self.ev_loop.run_until_complete(self.connector._update_balances())177        try:178            self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 0)179        except Exception:180            self.assertAlmostEqual(expected_quote_bal_self, self.connector.get_available_balance(self.quote_token), 0)181        # Reset the logs182        self.event_logger.clear()183        # Refresh the base balance184        base_bal = self.connector.get_available_balance(self.base_token)185        # Try to sell back the same amount to the exchange, and watch for completion event.186        price = self.connector.get_price(self.trading_pair, True) * Decimal("0.95")187        price = self.connector.quantize_order_price(self.trading_pair, price)188        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))189        order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2)190        order_completed_event = self._run_with_timeout(self.event_logger.wait_for(SellOrderCompletedEvent))191        self.ev_loop.run_until_complete(asyncio.sleep(5))192        trade_events = [t for t in self.event_logger.event_log if isinstance(t, OrderFilledEvent)]193        base_amount_traded = sum(t.amount for t in trade_events)194        quote_amount_traded = sum(t.amount * t.price for t in trade_events)195        self.assertTrue([evt.order_type == OrderType.LIMIT for evt in trade_events])196        self.assertEqual(order_id, order_completed_event.order_id)197        self.assertEqual(amount, order_completed_event.base_asset_amount)198        self.assertEqual("BTC", order_completed_event.base_asset)199        self.assertEqual("USD", order_completed_event.quote_asset)200        self.assertAlmostEqual(base_amount_traded, order_completed_event.base_asset_amount)201        self.assertAlmostEqual(quote_amount_traded, order_completed_event.quote_asset_amount)202        self.assertGreater(order_completed_event.fee_amount, Decimal(0))203        self.assertTrue(any([isinstance(event, SellOrderCreatedEvent) and event.order_id == order_id204                             for event in self.event_logger.event_log]))205        # check available base balance gets updated, we need to wait a bit for the balance message to arrive206        maker_fee = self.connector.estimate_fee_pct(True)207        taker_fee = self.connector.estimate_fee_pct(False)208        expected_base_bal = base_bal - base_amount_traded209        expected_base_bal_with_fee_m = base_bal - (base_amount_traded * (Decimal("1") + maker_fee))210        expected_base_bal_with_fee_t = base_bal - (base_amount_traded * (Decimal("1") + taker_fee))211        self.ev_loop.run_until_complete(asyncio.sleep(6))212        self.ev_loop.run_until_complete(self.connector._update_balances())213        self.ev_loop.run_until_complete(asyncio.sleep(6))214        try:215            self.assertAlmostEqual(expected_base_bal_with_fee_t, self.connector.get_available_balance(self.base_token), 5)216        except Exception:217            try:218                self.assertAlmostEqual(expected_base_bal_with_fee_m, self.connector.get_available_balance(self.base_token), 5)219            except Exception:220                self.assertAlmostEqual(expected_base_bal, self.connector.get_available_balance(self.base_token), 5)221    def test_limit_makers_unfilled(self):222        price = self.connector.get_price(self.trading_pair, True) * Decimal("0.8")223        price = self.connector.quantize_order_price(self.trading_pair, price)224        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))225        self.ev_loop.run_until_complete(asyncio.sleep(1))226        self.ev_loop.run_until_complete(self.connector._update_balances())227        self.ev_loop.run_until_complete(asyncio.sleep(2))228        quote_bal = self.connector.get_available_balance(self.quote_token)229        cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1)230        order_created_event = self._run_with_timeout(self.event_logger.wait_for(BuyOrderCreatedEvent))231        self.assertEqual(cl_order_id, order_created_event.order_id)232        # check available quote balance gets updated, we need to wait a bit for the balance message to arrive233        maker_fee = self.connector.estimate_fee_pct(True)234        taker_fee = self.connector.estimate_fee_pct(False)235        quote_amount = ((price * amount))236        expected_quote_bal = quote_bal - quote_amount237        expected_quote_bal_with_fee_m = quote_bal - ((price * amount) * (Decimal("1") + maker_fee))238        expected_quote_bal_with_fee_t = quote_bal - ((price * amount) * (Decimal("1") + taker_fee))239        self.ev_loop.run_until_complete(asyncio.sleep(1))240        self.ev_loop.run_until_complete(self.connector._update_balances())241        self.ev_loop.run_until_complete(asyncio.sleep(2))242        try:243            self.assertAlmostEqual(expected_quote_bal, self.connector.get_available_balance(self.quote_token), 5)244        except Exception:245            try:246                self.assertAlmostEqual(expected_quote_bal_with_fee_m, self.connector.get_available_balance(self.quote_token), 5)247            except Exception:248                self.assertAlmostEqual(expected_quote_bal_with_fee_t, self.connector.get_available_balance(self.quote_token), 5)249        self._cancel_order(cl_order_id)250        event = self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))251        self.assertEqual(cl_order_id, event.order_id)252        price = self.connector.get_price(self.trading_pair, True) * Decimal("1.2")253        price = self.connector.quantize_order_price(self.trading_pair, price)254        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))255        cl_order_id = self._place_order(False, amount, OrderType.LIMIT_MAKER, price, 2)256        order_created_event = self._run_with_timeout(self.event_logger.wait_for(SellOrderCreatedEvent))257        self.assertEqual(cl_order_id, order_created_event.order_id)258        self._cancel_order(cl_order_id)259        event = self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))260        self.assertEqual(cl_order_id, event.order_id)261    def test_order_quantized_values(self):262        bid_price: Decimal = self.connector.get_price(self.trading_pair, True)263        ask_price: Decimal = self.connector.get_price(self.trading_pair, False)264        mid_price: Decimal = (bid_price + ask_price) / 2265        # Make sure there's enough balance to make the limit orders.266        self.assertGreater(self.connector.get_balance("BTC"), Decimal("0.001"))267        self.assertGreater(self.connector.get_balance("USD"), Decimal("40"))268        # Intentionally set some prices with too many decimal places s.t. they269        # need to be quantized. Also, place them far away from the mid-price s.t. they won't270        # get filled during the test.271        bid_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("0.9333192292111341"))272        ask_price = self.connector.quantize_order_price(self.trading_pair, mid_price * Decimal("1.1492431474884933"))273        amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))274        # Test bid order275        cl_order_id_1 = self._place_order(True, amount, OrderType.LIMIT, bid_price, 1)276        # Wait for the order created event and examine the order made277        self._run_with_timeout(self.event_logger.wait_for(BuyOrderCreatedEvent))278        # Test ask order279        cl_order_id_2 = self._place_order(False, amount, OrderType.LIMIT, ask_price, 1)280        # Wait for the order created event and examine and order made281        self._run_with_timeout(self.event_logger.wait_for(SellOrderCreatedEvent))282        self._cancel_order(cl_order_id_1)283        self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))284        self._cancel_order(cl_order_id_2)285        self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))286    def test_filled_orders_recorded(self):287        config_path: str = "test_config"288        strategy_name: str = "test_strategy"289        sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path)290        order_id = None291        recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name)292        recorder.start()293        try:294            # Try to buy some token from the exchange, and watch for completion event.295            price = self.connector.get_price(self.trading_pair, True) * Decimal("1.1")296            price = self.connector.quantize_order_price(self.trading_pair, price)297            amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))298            order_id = self._place_order(True, amount, OrderType.LIMIT, price, 1)299            self._run_with_timeout(self.event_logger.wait_for(BuyOrderCompletedEvent))300            self.ev_loop.run_until_complete(asyncio.sleep(1))301            # Reset the logs302            self.event_logger.clear()303            # Try to sell back the same amount to the exchange, and watch for completion event.304            price = self.connector.get_price(self.trading_pair, True) * Decimal("0.9")305            price = self.connector.quantize_order_price(self.trading_pair, price)306            amount = self.connector.quantize_order_amount(self.trading_pair, Decimal("0.001"))307            order_id = self._place_order(False, amount, OrderType.LIMIT, price, 2)308            self._run_with_timeout(self.event_logger.wait_for(SellOrderCompletedEvent))309            self.ev_loop.run_until_complete(asyncio.sleep(1))310            # Query the persisted trade logs311            trade_fills: List[TradeFill] = recorder.get_trades_for_config(config_path)312            self.assertGreaterEqual(len(trade_fills), 2)313            buy_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "BUY"]314            sell_fills: List[TradeFill] = [t for t in trade_fills if t.trade_type == "SELL"]315            self.assertGreaterEqual(len(buy_fills), 1)316            self.assertGreaterEqual(len(sell_fills), 1)317            order_id = None318        finally:319            if order_id is not None:320                self.connector.cancel(self.trading_pair, order_id)321                self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent))322            recorder.stop()323            os.unlink(self.db_path)324    def test_orders_saving_and_restoration(self):325        config_path = "test_config"326        strategy_name = "test_strategy"327        sql = SQLConnectionManager(SQLConnectionType.TRADE_FILLS, db_path=self.db_path)328        order_id = None329        recorder = MarketsRecorder(sql, [self.connector], config_path, strategy_name)330        recorder.start()331        try:332            self.connector._order_tracker._in_flight_orders.clear()333            self.assertEqual(0, len(self.connector.tracking_states))334            # Try to put limit buy order for 0.001 BTC, and watch for order creation event.335            current_bid_price: Decimal = self.connector.get_price(self.trading_pair, True)336            price: Decimal = current_bid_price * Decimal("0.9")337            price = self.connector.quantize_order_price(self.trading_pair, price)338            amount: Decimal = Decimal("0.001")339            amount = self.connector.quantize_order_amount(self.trading_pair, amount)340            cl_order_id = self._place_order(True, amount, OrderType.LIMIT_MAKER, price, 1)341            order_created_event = self._run_with_timeout(self.event_logger.wait_for(BuyOrderCreatedEvent))342            self.assertEqual(cl_order_id, order_created_event.order_id)343            # Verify tracking states344            self.assertEqual(1, len(self.connector.tracking_states))345            self.assertEqual(cl_order_id, list(self.connector.tracking_states.keys())[0])346            # Verify orders from recorder347            recorded_orders: List[Order] = recorder.get_orders_for_config_and_market(config_path, self.connector)348            self.assertEqual(1, len(recorded_orders))349            self.assertEqual(cl_order_id, recorded_orders[0].id)350            # Verify saved market states351            with recorder._sql_manager.get_new_session() as session:352                saved_market_states: MarketState = recorder.get_market_states(config_path, self.connector, session=session)353            self.assertIsNotNone(saved_market_states)354            self.assertIsInstance(saved_market_states.saved_state, dict)355            self.assertGreater(len(saved_market_states.saved_state), 0)356            # Close out the current market and start another market.357            self.connector.stop(self._clock)358            self.ev_loop.run_until_complete(asyncio.sleep(5))359            self.clock.remove_iterator(self.connector)360            for event_tag in self.events:361                self.connector.remove_listener(event_tag, self.event_logger)362            # Clear the event loop363            self.event_logger.clear()364            new_connector = CoinflexExchange(365                domain=DOMAIN,366                coinflex_api_key=API_KEY,367                coinflex_api_secret=API_SECRET,368                trading_pairs=[self.trading_pair],369                trading_required=True370            )371            for event_tag in self.events:372                new_connector.add_listener(event_tag, self.event_logger)373            recorder.stop()374            recorder = MarketsRecorder(sql, [new_connector], config_path, strategy_name)375            recorder.start()376            with recorder._sql_manager.get_new_session() as session:377                saved_market_states = recorder.get_market_states(config_path, new_connector, session=session)378            self.clock.add_iterator(new_connector)379            self._run_with_timeout(self.wait_til_ready(new_connector), seconds=60)380            self.assertEqual(0, len(new_connector.limit_orders))381            self.assertEqual(0, len(new_connector.tracking_states))382            new_connector.restore_tracking_states(saved_market_states.saved_state)383            self.assertEqual(1, len(new_connector.limit_orders))384            self.assertEqual(1, len(new_connector.tracking_states))385            # Cancel the order and verify that the change is saved.386            self._cancel_order(cl_order_id, new_connector)387            self._run_with_timeout(self.event_logger.wait_for(OrderCancelledEvent))388            self.ev_loop.run_until_complete(asyncio.sleep(2))389            with recorder._sql_manager.get_new_session() as session:390                recorder.save_market_states(config_path, new_connector, session=session)391                self.ev_loop.run_until_complete(asyncio.sleep(2))392                order_id = None393                self.assertEqual(0, len(new_connector.limit_orders))394                self.assertEqual(0, len(new_connector.tracking_states))395                new_saved_market_states = recorder.get_market_states(config_path, new_connector, session=session)396            self.assertEqual(0, len(new_saved_market_states.saved_state))397        finally:398            if order_id is not None:399                self.connector.cancel(self.trading_pair, cl_order_id)400                self.run_parallel(self.event_logger.wait_for(OrderCancelledEvent))401            recorder.stop()...socket_drcom.py
Source:socket_drcom.py  
...28    def setkilltimeout(self, timeout):29        self.killtimeout = int(timeout)30    def getkilltimeout(self):31        return self.killtimeout32    def _run_with_timeout(self, func, a, b):33        alarm(self.killtimeout)34        rst = func(*a, **b)35        alarm(0)36        return rst37    def accept(self, *a, **b):38        return self._run_with_timeout(super().accept, a, b)39    def bind(self, *a, **b):40        return self._run_with_timeout(super().bind, a, b)41    def connect(self, *a, **b):42        return self._run_with_timeout(super().connect, a, b)43    def sendall(self, *a, **b):44        return self._run_with_timeout(super().sendall, a, b)45    def sendto(self, *a, **b):46        return self._run_with_timeout(super().sendto, a, b)47    def recvfrom(self, *a, **b):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
