Best Python code snippet using uiautomator
blockchain.py
Source:blockchain.py  
1import asyncio2import time3from http.client import CannotSendRequest4from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException5from core_modules.blackbox_modules.blockchain import store_data_in_utxo,\6    retrieve_data_from_utxo7from core_modules.settings import NetWorkSettings8# TODO: type check all these rpc calls, so that we can rely on it better9class NotEnoughConfirmations(Exception):10    pass11class BlockChain:12    def __init__(self, user, password, ip, rpcport):13        self.__url = "http://%s:%s@%s:%s" % (user, password, ip, rpcport)14        self.__reconnect()15    def __reconnect(self):16        while True:17            try:18                newjsonrpc = AuthServiceProxy(self.__url)19                # we need this so that we know the blockchain has started20                newjsonrpc.getwalletinfo()21            except ConnectionRefusedError:22                time.sleep(0.1)23            except JSONRPCException as exc:24                if exc.code == -28:25                    time.sleep(0.1)26                else:27                    raise28            else:29                self.__jsonrpc = newjsonrpc30                break31    def get_masternode_order(self, blocknum):32        # TODO: should return MN list: [(sha256(pubkey), ip, port, pubkey), (sha256(pubkey), ip, port, pubkey)...]33        # sha256(pubkey) is the nodeid as int34        raise NotImplementedError("TODO")35    def __call_jsonrpc(self, name, *params):36        while True:37            f = getattr(self.__jsonrpc, name)38            try:39                if len(params) == 0:40                    ret = f()41                else:42                    ret = f(*params)43            except (BrokenPipeError, CannotSendRequest) as exc:44                print("RECONNECTING %s" % exc)45                self.__reconnect()46            else:47                break48        return ret49    def help(self):50        return self.__call_jsonrpc("help")51    def addnode(self, node, mode):52        return self.__call_jsonrpc("addnode", node, mode)53    def listunspent(self, minimum=1, maximum=9999999, addresses=[]):54        return self.__call_jsonrpc("listunspent", minimum, maximum, addresses)55    def getblockchaininfo(self):56        return self.__call_jsonrpc("getblockchaininfo")57    def getmempoolinfo(self):58        return self.__call_jsonrpc("getmempoolinfo")59    def gettxoutsetinfo(self):60        return self.__call_jsonrpc("gettxoutsetinfo")61    def getmininginfo(self):62        return self.__call_jsonrpc("getmininginfo")63    def getnetworkinfo(self):64        return self.__call_jsonrpc("getnetworkinfo")65    def getpeerinfo(self):66        return self.__call_jsonrpc("getpeerinfo")67    def getwalletinfo(self):68        return self.__call_jsonrpc("getwalletinfo")69    def getbalance(self):70        return self.__call_jsonrpc("getbalance")71    def getnewaddress(self):72        return self.__call_jsonrpc("getnewaddress")73    def createrawtransaction(self, transactions, addresses):74        return self.__call_jsonrpc("createrawtransaction", transactions, addresses)75    def decoderawtransaction(self, transaction_hex):76        return self.__call_jsonrpc("decoderawtransaction", transaction_hex)77    def signrawtransaction(self, transaction_hex):78        return self.__call_jsonrpc("signrawtransaction", transaction_hex)79    def fundrawtransaction(self):80        return self.__call_jsonrpc("fundrawtransaction")81    def sendtoaddress(self, address, amount, private_comment="", public_comment=""):82        return self.__call_jsonrpc("sendtoaddress", address, amount, private_comment, public_comment)83    def sendrawtransaction(self, transaction_hex):84        return self.__call_jsonrpc("sendrawtransaction", transaction_hex)85    def getblock(self, blocknum):86        return self.__call_jsonrpc("getblock", blocknum)87    def getblockcount(self):88        return int(self.__call_jsonrpc("getblockcount"))89    def getaccountaddress(self, address):90        return self.__call_jsonrpc("getaccountaddress", address)91    def mnsync(self, param):92        return self.__call_jsonrpc("mnsync", param)93    def gettransaction(self, txid):94        return self.__call_jsonrpc("gettransaction", txid)95    def listsinceblock(self):96        return self.__call_jsonrpc("listsinceblock")97    def getbestblockhash(self):98        return self.__call_jsonrpc("getbestblockhash")99    def getwalletinfo(self):100        return self.__call_jsonrpc("getwalletinfo")101    def getrawtransaction(self, txid, verbose):102        return self.__call_jsonrpc("getrawtransaction", txid, verbose)103    def getrawmempool(self, verbose):104        return self.__call_jsonrpc("getrawmempool", verbose)105    def generate(self, n):106        return self.__call_jsonrpc("generate", int(n))107    def search_chain(self, confirmations=NetWorkSettings.REQUIRED_CONFIRMATIONS):108        blockcount = self.getblockcount() - 1109        for blocknum in range(1, blockcount + 1):110            for txid in self.get_txids_for_block(blocknum, confirmations=confirmations):111                yield txid112    def get_txids_for_block(self, blocknum, confirmations):113        try:114            block = self.getblock(str(blocknum))115        except JSONRPCException as exc:116            if exc.code == -8:117                # Block height out of range118                raise NotEnoughConfirmations119            else:120                raise121        if block["confirmations"] < confirmations:122            raise NotEnoughConfirmations123        else:124            for txid in block["tx"]:125                yield txid126    def store_data_in_utxo(self, input_data):127        while True:128            try:129                txid = store_data_in_utxo(self.__jsonrpc, input_data)130            except BrokenPipeError:131                self.__reconnect()132            else:133                break134        return txid135    def retrieve_data_from_utxo(self, blockchain_transaction_id):136        while True:137            try:138                return retrieve_data_from_utxo(self.__jsonrpc, blockchain_transaction_id)139            except BrokenPipeError:...node_message.py
Source:node_message.py  
1class NodeMessage:2    def __init__(self, method, params, jsonrpc, id_):3        self.__method = method4        self.__params = params5        self.__jsonrpc = jsonrpc6        self.__id = id_7    def as_json(self) -> dict:8        json_representation = {9            'jsonrpc': self.__jsonrpc,10            'id': self.__id,11            'method': self.__method,12        }13        if self.__params is not None:14            json_representation['params'] = self.__params...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
