Best Python code snippet using localstack_python
serialize.py
Source:serialize.py  
...102            'body': b''103        }104        return serialized105    # Some extra utility methods subclasses can use.106    def _timestamp_iso8601(self, value):107        if value.microsecond > 0:108            timestamp_format = ISO8601_MICRO109        else:110            timestamp_format = ISO8601111        return value.strftime(timestamp_format)112    def _timestamp_unixtimestamp(self, value):113        return int(calendar.timegm(value.timetuple()))114    def _timestamp_rfc822(self, value):115        return formatdate(value, usegmt=True)116    def _convert_timestamp_to_str(self, value):117        datetime_obj = parse_to_aware_datetime(value)118        converter = getattr(119            self, '_timestamp_%s' % self.TIMESTAMP_FORMAT.lower())120        final_value = converter(datetime_obj)...contractEvents_v1_0_0_0.py
Source:contractEvents_v1_0_0_0.py  
1from past.builtins import execfile2from . import settings as settings3settings = settings.settings()4execfile(settings.loader('utilities'))5frameworkInfo = settings.getFrameworkInfo()6CONNECTION_STRING = frameworkInfo['ConnectionString']['connectionString']7ETHERSCAN_APIKEY = frameworkInfo['APIKeys']['etherscan']8BSCSCAN_APIKEY = frameworkInfo['APIKeys']['bscscan']9BITQUERY_APIKEY = frameworkInfo['APIKeys']['bitquery']10CONTRACTS_FOLDER = frameworkInfo['Folders']['contracts']11DATA_FOLDER = frameworkInfo['Folders']['data']12from past.builtins import execfile13libraryPath = '{0}/libraries_v{1}.py'.format(frameworkInfo['Folders']['classes'], frameworkInfo['Metadata']['version'])14execfile(libraryPath)15class contractEvents:16    17    def ___init__(self, name):18        self.name = name19    20    21    22    #################################################################################################################23    24    # @dev: This function allow us to save events name in an array from ABI25    # Input: ABI of a smart contract26    # Output: array with events name27    28    def getEventsFromABI(self, _ABI):29        30        import re31        32        events = []33        for match_type in re.finditer(r'\"\,\"type":\"event\"', _ABI):34            for match_name in re.finditer(r'\"name\"\:\"', _ABI[0:match_type.start()]):35                True36                #no-op37            events.append(_ABI[match_name.end():match_type.start()])38        return events39    40    def getEventsFromSmartContract(self, _contract):41        42        try:43            events = list(filter(lambda k: '_' not in k, dir(_contract.events))) 44        except:45            events = []46            47        return events48    49    def downloadAllLogsEvents(self, _path, _limit = 90000): #main useful function to be run50        # CSV has 3 columns: Network, Address, NameAddress51        import pandas as pd52        addressListRaw = pd.read_csv(_path)53        networks = addressListRaw['Network'].unique()54        addressList = {}55        for network in networks:56            addressList[network] = []57        for indexAddress in range(0, len(addressListRaw)):58            for network in networks:59                if addressListRaw['Network'][indexAddress] == network:60                    addressList[network].append(addressListRaw['Address'][indexAddress])61        bitQueryAllowedNetworks = ['ethereum', 'bsc']62        terraAllowedNetwoks = ['columbus-4', 'tequila-0004', 'Mombay-0008', 'Localterra']63        for network in networks:64            if network in bitQueryAllowedNetworks: self.getLogsEventsBitQuery(addressList[network], _limit, network)65            if network in terraAllowedNetwoks: True #terraChain.getLogsEvents(addressList[network], _limit, network) #to-do 66    67    def getAddressMetaData(self, _address):68        result = None69        try:70            df = pd.read_csv('{}/smartContract.csv'.format(DATA_FOLDER))71        except:72            df = pd.DataFrame(columns = ['Address'])73            print('File not found.')74        finally:75            dfAddress = pd.DataFrame(columns = ['Address'], data = [_address])76            result = pd.merge(df, dfAddress, on='Address', how='inner')77        return result78    def getLogsEventsBitQuery(self, _addressList, _limit, _network = 'ethereum'):79        #execfile(loadVersioning.loader('blockchain'))80        #print(loadVersioning.loader('blockchain'))81        from . import blockchain_v1_0_0_0 as blockchain82        blockchain = blockchain.blockchain()83        from classes import bitQuery_v1_0_0_0 as bitQuery84        query = bitQuery.query()85        bq = bitQuery.bitQuery()86        from datetime import datetime87        import pandas as pd88        logsFound = 089        dt = datetime.now().strftime("%Y%m%d_%H%M%S")90        if _network in ['ethereum', 'bsc']:91            for address in _addressList:92                addressMetaData = self.getAddressMetaData(address)93                (smartContract, ABI) = blockchain.getContract(address, _network)94                if(smartContract != None): 95                    blockchain.saveContractData(smartContract)96                    for event in self.getEventsFromSmartContract(smartContract):97                        if event != 'abi':98                            q = query.getEvents(address, event, _limit, _network)99                            print('{} - {} downloading events log...'.format(address, event))100                            result = bq.runQuery(q)101                            # Data transformation102                            listResult = result['data']['ethereum']['smartContractEvents']103                            isFirst = True104                            df = []105                            for indexList in range(0, len(listResult)):106                                blockNumber = listResult[indexList]['block']['height']107                                timestamp_iso8601 = listResult[indexList]['block']['timestamp']['iso8601']108                                timestamp_unixtime = listResult[indexList]['block']['timestamp']['unixtime']109                                arguments = listResult[indexList]['arguments']110                                tmp_df = pd.DataFrame()111                                tmp_df['blockNumber'] = [blockNumber]112                                tmp_df['timestamp_iso8601'] = [timestamp_iso8601]113                                tmp_df['timestamp_unixtime'] = [timestamp_unixtime] 114                                tmp_df['network'] = [addressMetaData['Network']]115                                tmp_df['nameAddress'] = [addressMetaData['NameAddress']]116                                for indexArgs in range(0, len(arguments)):117                                    tmp_df[arguments[indexArgs]['argument']] = [arguments[indexArgs]['value']]118                                if(isFirst):119                                    df = tmp_df120                                    isFirst = False121                                else:122                                    df = pd.concat([df, tmp_df])123                            if len(df) > 0:124                                print('{} - {} correctly saved'.format(address, event))125                                df.to_csv('data/contracts/{}/event_{}_{}.csv'.format(address.lower(),event, dt))126                                logsFound = logsFound + len(df)127                            else:128                                print('{} - {} no logs found'.format(address, event))                    129        else: 130            print ('Network not valid')131#################################################################################################################132    133    # @dev: This function allow to merge all events for each contract present inside data/contracts/{addressContract}134    # based on a blocknumber timeline135    # Input: nothing136    # Output: Dataframe with merged logs based on blocknumber timeline137    138    def mergeAllLogsData(self, _backupPath = None, _overloadData = False):139        140        import numpy as np141        dfList = {}142        143        contractList = [f for f in listdir(CONTRACTS_FOLDER) if isdir(join(CONTRACTS_FOLDER, f))]144        145        eventPrefix = 'event_'146        if _backupPath == None:147            # New Dataframe148            blockNumberDf = pd.DataFrame()149            columnList = []150        else:151            # Load existing dataframer with relative columns152            print('Importing backup...')153            bck = pd.read_csv(_backupPath, low_memory=False)154            blockNumberDf = bck['blockNumber']155            columnList = bck.keys()156            print('Imported {} rows and {} columns'.format(len(blockNumberDf), len(columnList)))157        158        print('Importing smart contract event data...')159        for contractFolder in contractList:160            if(contractFolder != '.ipynb_checkpoints'):161                cFolder = CONTRACTS_FOLDER+'/'+contractFolder162                eventsDataList = np.sort([f for f in listdir(cFolder) if isfile(join(cFolder, f))])[::-1]163                for eventsData in eventsDataList:164                    if(eventsData[0:len(eventPrefix)] == eventPrefix):165                        keyName = contractFolder.replace(" ", "_")                +"_"+eventsData[len(eventPrefix):len(eventsData)-20] 166                        isPresent = False167                        for columnBck in columnList:168                            if len(columnBck.split('_')) > 1: 169                                if columnBck.split('_')[0]+'_'+columnBck.split('_')[1] == keyName:170                                    isPresent = True171                        if isPresent == False or _overloadData:172                            tmpDf = pd.read_csv(cFolder+'/'+eventsData, low_memory=False)173                            keyName = keyName.upper()174                            if len(tmpDf.keys()) > 0:175                                tmpDf = tmpDf.drop(columns=[tmpDf.keys()[0]])176                                dfList[keyName] = tmpDf.add_prefix('{}_'.format(keyName))177                                dfList[keyName]['blockNumber'] = tmpDf['blockNumber']178                                dfList[keyName].set_index('blockNumber')179                                blockNumberDf = pd.concat([blockNumberDf, tmpDf['blockNumber']])180                            else:181                                print("Keys not found at {}".format(keyName))182                        else:183                            print('{} already taken from Backup'.format(keyName))184        185        if _backupPath is not None: 186            blockNumberList = bck #set backup to start dataframe187        else:188            blockNumberList = pd.DataFrame(blockNumberDf[0].unique(), columns=['blockNumber']) # start dataframe189        blockNumberList.set_index('blockNumber')190        191        print('Starting merge...')192        numberOfKeys = len(dfList.keys())193        startIndex = 1194        195        # Merging196        197        for key in dfList.keys():198            try:199                #print('Merging event {}...'.format(key))200                blockNumberList = pd.merge(blockNumberList, dfList[key], on='blockNumber', how='left')201                dfList[key] = None #save space memory202                print("{}/{} - {} event was merged".format(startIndex, numberOfKeys, key))203            except:204                print('Error on sql query. Key: {}'.format(key))205            startIndex = startIndex + 1206        207        dt = datetime.now().strftime("%Y%m%d_%H%M%S")208        209        # Backup pre-optimization210        211        print('Backup before optimization...')212        blockNumberList.to_csv('{0}/bck_smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))213        # Optimize merged logs:214        blockNumberList = self.optimizeMergedLogs(blockNumberList)215    216        # After All - Save data217        218        blockNumberList.to_csv('{0}/smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))219        print('File saved on {0}/smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))220       221    #################################################################################################################222    223    # @dev: 224    # Input: Dataframe of merged logs225    # Output: Reduced dataframe where timestamp_iso8601, timestamp_unixtime are not duplicated on columns226    # but merged in a unique column227    def optimizeMergedLogs(self, _df):228        229        import datetime230        date_time_str = '1970-01-01 00:00:00.000000'231        date_time_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S.%f')232        indexList_timestamp_iso8601 = []233        indexList_timestamp_unixtime = []234        columnsToDrop = []235        index = 1236        for column in _df.keys():237            if '_timestamp_iso8601' in column:238                _df[column] = pd.to_datetime(_df[column], utc=True)239                _df[column] = _df[column].fillna(date_time_obj)240                indexList_timestamp_iso8601.append(column)241            if '_timestamp_unixtime' in column:242                indexList_timestamp_unixtime.append(column)243            if "_blockNumber" in column: columnsToDrop.append(column)244            if "Unnamed:" in column: columnsToDrop.append(column)245            print('Analyzed {} of {} columns'.format(index, len(_df.keys())))246            index = index + 1247        print('Optimizing timestamp_iso8601...')248        _df['timestamp_iso8601'] = _df[indexList_timestamp_iso8601].astype(str).max(axis=1)249        print('Optimizing timestamp_unixtime...')250        _df['timestamp_unixtime'] = _df[indexList_timestamp_unixtime].max(axis=1)251        print('Dropping redundante columns...')252        _df = _df.drop(columns = indexList_timestamp_iso8601)253        _df = _df.drop(columns = indexList_timestamp_unixtime)254        _df = _df.drop(columns = columnsToDrop)255        print('Dataframe optimized with success.')256        257        return _df258            259        260    ...contractEvents_v1_0_0_0-checkpoint.py
Source:contractEvents_v1_0_0_0-checkpoint.py  
1from past.builtins import execfile2from . import settings as settings3settings = settings.settings()4execfile(settings.loader('utilities'))5frameworkInfo = settings.getFrameworkInfo()6CONNECTION_STRING = frameworkInfo['ConnectionString']['connectionString']7ETHERSCAN_APIKEY = frameworkInfo['APIKeys']['etherscan']8BSCSCAN_APIKEY = frameworkInfo['APIKeys']['bscscan']9BITQUERY_APIKEY = frameworkInfo['APIKeys']['bitquery']10CONTRACTS_FOLDER = frameworkInfo['Folders']['contracts']11DATA_FOLDER = frameworkInfo['Folders']['data']12from past.builtins import execfile13libraryPath = '{0}/libraries_v{1}.py'.format(frameworkInfo['Folders']['classes'], frameworkInfo['Metadata']['version'])14execfile(libraryPath)15class contractEvents:16    17    def ___init__(self, name):18        self.name = name19    20    21    22    #################################################################################################################23    24    # @dev: This function allow us to save events name in an array from ABI25    # Input: ABI of a smart contract26    # Output: array with events name27    28    def getEventsFromABI(self, _ABI):29        30        import re31        32        events = []33        for match_type in re.finditer(r'\"\,\"type":\"event\"', _ABI):34            for match_name in re.finditer(r'\"name\"\:\"', _ABI[0:match_type.start()]):35                True36                #no-op37            events.append(_ABI[match_name.end():match_type.start()])38        return events39    40    def getEventsFromSmartContract(self, _contract):41        42        try:43            events = list(filter(lambda k: '_' not in k, dir(_contract.events))) 44        except:45            events = []46            47        return events48    49    def downloadAllLogsEvents(self, _path, _limit = 90000): #main useful function to be run50        # CSV has 3 columns: Network, Address, NameAddress51        import pandas as pd52        addressListRaw = pd.read_csv(_path)53        networks = addressListRaw['Network'].unique()54        addressList = {}55        for network in networks:56            addressList[network] = []57        for indexAddress in range(0, len(addressListRaw)):58            for network in networks:59                if addressListRaw['Network'][indexAddress] == network:60                    addressList[network].append(addressListRaw['Address'][indexAddress])61        bitQueryAllowedNetworks = ['ethereum', 'bsc']62        terraAllowedNetwoks = ['columbus-4', 'tequila-0004', 'Mombay-0008', 'Localterra']63        for network in networks:64            if network in bitQueryAllowedNetworks: self.getLogsEventsBitQuery(addressList[network], _limit, network)65            if network in terraAllowedNetwoks: True #terraChain.getLogsEvents(addressList[network], _limit, network) #to-do 66    67    def getAddressMetaData(self, _address):68        result = None69        try:70            df = pd.read_csv('{}/smartContract.csv'.format(DATA_FOLDER))71        except:72            df = pd.DataFrame(columns = ['Address'])73            print('File not found.')74        finally:75            dfAddress = pd.DataFrame(columns = ['Address'], data = [_address])76            result = pd.merge(df, dfAddress, on='Address', how='inner')77        return result78    def getLogsEventsBitQuery(self, _addressList, _limit, _network = 'ethereum'):79        #execfile(loadVersioning.loader('blockchain'))80        #print(loadVersioning.loader('blockchain'))81        from . import blockchain_v1_0_0_0 as blockchain82        blockchain = blockchain.blockchain()83        from classes import bitQuery_v1_0_0_0 as bitQuery84        query = bitQuery.query()85        bq = bitQuery.bitQuery()86        from datetime import datetime87        import pandas as pd88        logsFound = 089        dt = datetime.now().strftime("%Y%m%d_%H%M%S")90        if _network in ['ethereum', 'bsc']:91            for address in _addressList:92                addressMetaData = self.getAddressMetaData(address)93                (smartContract, ABI) = blockchain.getContract(address, _network)94                if(smartContract != None): 95                    blockchain.saveContractData(smartContract)96                    for event in self.getEventsFromSmartContract(smartContract):97                        if event != 'abi':98                            q = query.getEvents(address, event, _limit, _network)99                            print('{} - {} downloading events log...'.format(address, event))100                            result = bq.runQuery(q)101                            # Data transformation102                            listResult = result['data']['ethereum']['smartContractEvents']103                            isFirst = True104                            df = []105                            for indexList in range(0, len(listResult)):106                                blockNumber = listResult[indexList]['block']['height']107                                timestamp_iso8601 = listResult[indexList]['block']['timestamp']['iso8601']108                                timestamp_unixtime = listResult[indexList]['block']['timestamp']['unixtime']109                                arguments = listResult[indexList]['arguments']110                                tmp_df = pd.DataFrame()111                                tmp_df['blockNumber'] = [blockNumber]112                                tmp_df['timestamp_iso8601'] = [timestamp_iso8601]113                                tmp_df['timestamp_unixtime'] = [timestamp_unixtime] 114                                tmp_df['network'] = [addressMetaData['Network']]115                                tmp_df['nameAddress'] = [addressMetaData['NameAddress']]116                                for indexArgs in range(0, len(arguments)):117                                    tmp_df[arguments[indexArgs]['argument']] = [arguments[indexArgs]['value']]118                                if(isFirst):119                                    df = tmp_df120                                    isFirst = False121                                else:122                                    df = pd.concat([df, tmp_df])123                            if len(df) > 0:124                                print('{} - {} correctly saved'.format(address, event))125                                df.to_csv('data/contracts/{}/event_{}_{}.csv'.format(address.lower(),event, dt))126                                logsFound = logsFound + len(df)127                            else:128                                print('{} - {} no logs found'.format(address, event))                    129        else: 130            print ('Network not valid')131#################################################################################################################132    133    # @dev: This function allow to merge all events for each contract present inside data/contracts/{addressContract}134    # based on a blocknumber timeline135    # Input: nothing136    # Output: Dataframe with merged logs based on blocknumber timeline137    138    def mergeAllLogsData(self, _backupPath = None, _overloadData = False):139        140        import numpy as np141        dfList = {}142        143        contractList = [f for f in listdir(CONTRACTS_FOLDER) if isdir(join(CONTRACTS_FOLDER, f))]144        145        eventPrefix = 'event_'146        if _backupPath == None:147            # New Dataframe148            blockNumberDf = pd.DataFrame()149            columnList = []150        else:151            # Load existing dataframer with relative columns152            print('Importing backup...')153            bck = pd.read_csv(_backupPath, low_memory=False)154            blockNumberDf = bck['blockNumber']155            columnList = bck.keys()156            print('Imported {} rows and {} columns'.format(len(blockNumberDf), len(columnList)))157        158        print('Importing smart contract event data...')159        for contractFolder in contractList:160            if(contractFolder != '.ipynb_checkpoints'):161                cFolder = CONTRACTS_FOLDER+'/'+contractFolder162                eventsDataList = np.sort([f for f in listdir(cFolder) if isfile(join(cFolder, f))])[::-1]163                for eventsData in eventsDataList:164                    if(eventsData[0:len(eventPrefix)] == eventPrefix):165                        keyName = contractFolder.replace(" ", "_")                +"_"+eventsData[len(eventPrefix):len(eventsData)-20] 166                        isPresent = False167                        for columnBck in columnList:168                            if len(columnBck.split('_')) > 1: 169                                if columnBck.split('_')[0]+'_'+columnBck.split('_')[1] == keyName:170                                    isPresent = True171                        if isPresent == False or _overloadData:172                            tmpDf = pd.read_csv(cFolder+'/'+eventsData, low_memory=False)173                            keyName = keyName.upper()174                            if len(tmpDf.keys()) > 0:175                                tmpDf = tmpDf.drop(columns=[tmpDf.keys()[0]])176                                dfList[keyName] = tmpDf.add_prefix('{}_'.format(keyName))177                                dfList[keyName]['blockNumber'] = tmpDf['blockNumber']178                                dfList[keyName].set_index('blockNumber')179                                blockNumberDf = pd.concat([blockNumberDf, tmpDf['blockNumber']])180                            else:181                                print("Keys not found at {}".format(keyName))182                        else:183                            print('{} already taken from Backup'.format(keyName))184        185        if _backupPath is not None: 186            blockNumberList = bck #set backup to start dataframe187        else:188            blockNumberList = pd.DataFrame(blockNumberDf[0].unique(), columns=['blockNumber']) # start dataframe189        blockNumberList.set_index('blockNumber')190        191        print('Starting merge...')192        numberOfKeys = len(dfList.keys())193        startIndex = 1194        195        # Merging196        197        for key in dfList.keys():198            try:199                #print('Merging event {}...'.format(key))200                blockNumberList = pd.merge(blockNumberList, dfList[key], on='blockNumber', how='left')201                dfList[key] = None #save space memory202                print("{}/{} - {} event was merged".format(startIndex, numberOfKeys, key))203            except:204                print('Error on sql query. Key: {}'.format(key))205            startIndex = startIndex + 1206        207        dt = datetime.now().strftime("%Y%m%d_%H%M%S")208        209        # Backup pre-optimization210        211        print('Backup before optimization...')212        blockNumberList.to_csv('{0}/bck_smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))213        # Optimize merged logs:214        blockNumberList = self.optimizeMergedLogs(blockNumberList)215    216        # After All - Save data217        218        blockNumberList.to_csv('{0}/smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))219        print('File saved on {0}/smartContractsLogsGrouped_{1}.csv'.format(CONTRACTS_FOLDER, dt))220       221    #################################################################################################################222    223    # @dev: 224    # Input: Dataframe of merged logs225    # Output: Reduced dataframe where timestamp_iso8601, timestamp_unixtime are not duplicated on columns226    # but merged in a unique column227    def optimizeMergedLogs(self, _df):228        229        import datetime230        date_time_str = '1970-01-01 00:00:00.000000'231        date_time_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S.%f')232        indexList_timestamp_iso8601 = []233        indexList_timestamp_unixtime = []234        columnsToDrop = []235        index = 1236        for column in _df.keys():237            if '_timestamp_iso8601' in column:238                _df[column] = pd.to_datetime(_df[column], utc=True)239                _df[column] = _df[column].fillna(date_time_obj)240                indexList_timestamp_iso8601.append(column)241            if '_timestamp_unixtime' in column:242                indexList_timestamp_unixtime.append(column)243            if "_blockNumber" in column: columnsToDrop.append(column)244            if "Unnamed:" in column: columnsToDrop.append(column)245            print('Analyzed {} of {} columns'.format(index, len(_df.keys())))246            index = index + 1247        print('Optimizing timestamp_iso8601...')248        _df['timestamp_iso8601'] = _df[indexList_timestamp_iso8601].astype(str).max(axis=1)249        print('Optimizing timestamp_unixtime...')250        _df['timestamp_unixtime'] = _df[indexList_timestamp_unixtime].max(axis=1)251        print('Dropping redundante columns...')252        _df = _df.drop(columns = indexList_timestamp_iso8601)253        _df = _df.drop(columns = indexList_timestamp_unixtime)254        _df = _df.drop(columns = columnsToDrop)255        print('Dataframe optimized with success.')256        257        return _df258            259        260    ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
