How to use _init_instruments method in ATX

Best Python code snippet using ATX Github


Full Screen

...52 self._instrument_assets = None53 async def exchanges(self):54 return [EXCHANGE_SYMBOL]55 async def assets(self):56 await self._init_instruments()57 return sorted(self._instrument_assets or [])58 async def instruments(self):59 await self._init_instruments()60 return sorted(self._instruments.keys())61 async def markets(self):62 return [Market(EXCHANGE_SYMBOL, symbol) for symbol in await self.instruments()]63 async def trades(64 self,65 market: str,66 since: Optional[Timestamp] = None,67 timeout: Optional[float] = None,68 until_now: bool = True,69 ):70 # TODO: support more complete query API instead of just since71 start_time = time.time()72 while True:73 series = await self.trades_series(market, since)74 if len(series) > 0:75 since = series.partition.period.end76 yield series77 if until_now and len(series) == 0:78 break79 elapsed_time = time.time() - start_time80 if timeout and elapsed_time > timeout - DEFAULT_WAIT_TIME:81 break82 async def trades_series(self, market: str, since: Optional[Timestamp] = None):83 exchange_symbol, instrument = market.split(":", 1)84 if exchange_symbol != EXCHANGE_SYMBOL:85 raise ValueError(f"{SOURCE_SYMBOL}: market not supported: {market}")86 await self._init_instruments()87 pair = self._instruments[instrument]88 result = await self.request_trades(pair["name"], since)89 raw_trades = result[pair["name"]]90 price_scale = pair["pair_decimals"]91 amount_scale = pair["lot_decimals"]92 raw_price, raw_amount, raw_time, raw_side, raw_order, raw_misc = (93 zip(*raw_trades) if raw_trades else [[], [], [], [], [], []]94 )95 price = [int(Decimal(n).normalize().scaleb(price_scale)) for n in raw_price]96 amount = [int(Decimal(n).normalize().scaleb(amount_scale)) for n in raw_amount]97 time = [fix_time(n) for n in raw_time]98 side = [TAKER_SIDES.get(n) for n in raw_side]99 order = [ORDER_TYPES.get(n) for n in raw_order]100 extra_json = []101 for index, s, o in zip(range(len(raw_trades)), side, order):102 entry = {}103 if s == None:104 entry["side"] = raw_side[index]105 if o == None:106 entry["order"] = raw_order[index]107 if raw_misc[index]:108 entry["misc"] = raw_misc[index]109 if len(entry.keys()) > 0:110 extra_json.append(json.dumps({SOURCE_SYMBOL: entry}))111 else:112 extra_json.append(None)113 pagination_next = Timestamp(int(result["last"]), tz="UTC")114 partition = Partition(115 source=SOURCE_SYMBOL,116 market=Market(EXCHANGE_SYMBOL, instrument),117 period=TimeInterval(since or time[0], pagination_next),118 )119 table = pa.Table.from_arrays(120 [121 pa.array(time, type=pa.timestamp("ns", tz="UTC")),122 pa.array(price, type=pa.uint64()),123 pa.array(amount, type=pa.uint64()),124 pa.array(side, type=pa.dictionary(pa.int8(), pa.string())),125 pa.array(order, type=pa.dictionary(pa.int8(), pa.string())),126 pa.array(extra_json, type=pa.string()),127 ],128 names=["time", "price", "amount", "side", "order", "extra_json"],129 )130 return TableSeries(table, partition)131 async def _init_instruments(self):132 if self._instruments:133 return134 asset_pairs = await self.request("AssetPairs")135 instruments = {}136 instrument_assets = set()137 for name, data in asset_pairs.items():138 if "wsname" not in data:139 continue140 data["name"] = name141 base, quote = map(fix_asset, data["wsname"].split("/"))142 data["symbol"] = "/".join([base, quote])143 instruments[data["symbol"]] = data144 instrument_assets.add(base)145 instrument_assets.add(quote)...

Full Screen

Full Screen Github


Full Screen

...33 self._init_display()34 self.screen_rotation = 1 # TODO: auto judge35 if not bundle_id:36 print 'WARNING []: bundle_id is not set' #, use "" instead.'37 # self._init_instruments('')38 else:39 self._init_instruments(bundle_id)40 def _init_display(self):41 model =['HardwareModel']42 with open(os.path.join(__dir__, 'ios-models.yml'), 'rb') as f:43 items = yaml.load( for item in items:45 if model == item.get('model'):46 (width, height) = map(int, item.get('pixel').split('x'))47 self._scale = item.get('scale')48 self._display = Display(width*self._scale, height*self._scale)49 break50 if self._display is None:51 raise RuntimeError("TODO: not support your phone for now, You need contact the author.")52 53 def _init_instruments(self, bundle_id):54 self._bootstrap = os.path.join(__dir__, '')55 self._bundle_id = bundle_id56 self._env.update({'UDID': self.udid, 'BUNDLE_ID': self._bundle_id})57 # 1. remove pipe58 # subprocess.check_output([self._bootstrap, 'reset'], env=self._env)59 # 2. start instruments60 self._proc = subprocess.Popen([self._bootstrap, 'instruments'], env=self._env, stdout=subprocess.PIPE)61 self.sleep(5.0)62 self._wait_instruments()63 def _wait_instruments(self):64 ret = self._run('1')65 if ret != 1:66 log.error('Instruments stdout:\n' + raise RuntimeError('Instruments start failed, expect 1 but got %s' % (ret,))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?