How to use is_full method in avocado

Best Python code snippet using avocado_python

websocket.py

Source:websocket.py Github

copy

Full Screen

1#!/usr/bin/env python32# -*- coding: utf-8 -*-34import json5import socket67from time import time8from sys import maxsize9from typing import List10from websockets import client11from Cryptodome.Random.random import randint1213from ontology.account.account import Account14from ontology.smart_contract.neo_vm import NeoVm15from ontology.core.transaction import Transaction16from ontology.exception.error_code import ErrorCode17from ontology.exception.exception import SDKException18from ontology.utils.transaction import ensure_bytearray_contract_address19from ontology.smart_contract.neo_contract.abi.abi_function import AbiFunction20from ontology.smart_contract.neo_contract.abi.build_params import BuildParams21from ontology.smart_contract.neo_contract.invoke_function import InvokeFunction222324class WebsocketClient(object):25 def __init__(self, url: str = ''):26 self.__url = url27 self.__id = 028 self.__ws_client = None2930 def __generate_ws_id(self):31 if self.__id == 0:32 self.__id = randint(0, maxsize)33 return self.__id3435 def set_address(self, url: str):36 self.__url = url3738 def get_address(self):39 return self.__url4041 def connect_to_localhost(self):42 self.set_address('http://localhost:20335')4344 def connect_to_test_net(self, index: int = 0):45 if index == 0:46 index = randint(1, 5)47 restful_address = f'ws://polaris{index}.ont.io:20335'48 self.set_address(restful_address)4950 def connect_to_main_net(self, index: int = 0):51 if index == 0:52 index = randint(1, 3)53 restful_address = f'ws://dappnode{index}.ont.io:20335'54 self.set_address(restful_address)5556 async def connect(self):57 try:58 self.__ws_client = await client.connect(self.__url)59 except ConnectionAbortedError as e:60 raise SDKException(ErrorCode.other_error(e.args[1])) from None61 except socket.gaierror as e:62 raise SDKException(ErrorCode.other_error(e.args[1])) from None6364 async def close_connect(self):65 if isinstance(self.__ws_client, client.WebSocketClientProtocol) and not self.__ws_client.closed:66 await self.__ws_client.close()6768 async def __send_recv(self, msg: dict, is_full: bool):69 if self.__ws_client is None or self.__ws_client.closed:70 await self.connect()71 await self.__ws_client.send(json.dumps(msg))72 response = await self.__ws_client.recv()73 response = json.loads(response)74 if is_full:75 return response76 if response['Error'] != 0:77 raise SDKException(ErrorCode.other_error(response.get('Result', '')))78 return response.get('Result', dict())7980 async def send_heartbeat(self, is_full: bool = False):81 if self.__id == 0:82 self.__id = self.__generate_ws_id()83 msg = dict(Action='heartbeat', Version='V1.0.0', Id=self.__id)84 return await self.__send_recv(msg, is_full)8586 async def get_connection_count(self, is_full: bool = False) -> int:87 if self.__id == 0:88 self.__id = self.__generate_ws_id()89 msg = dict(Action='getconnectioncount', Id=self.__id, Version='1.0.0')90 return await self.__send_recv(msg, is_full)9192 async def get_session_count(self, is_full: bool = False):93 if self.__id == 0:94 self.__id = self.__generate_ws_id()95 msg = dict(Action='getsessioncount', Id=self.__id, Version='1.0.0')96 return await self.__send_recv(msg, is_full)9798 async def get_balance(self, b58_address: str, is_full: bool = False):99 if self.__id == 0:100 self.__id = self.__generate_ws_id()101 msg = dict(Action='getbalance', Id=self.__id, Version='1.0.0', Addr=b58_address)102 response = await self.__send_recv(msg, is_full=True)103 response['Result'] = dict((k, int(v)) for k, v in response['Result'].items())104 if is_full:105 return response106 return response['Result']107108 async def get_merkle_proof(self, tx_hash: str, is_full: bool = False):109 if self.__id == 0:110 self.__id = self.__generate_ws_id()111 msg = dict(Action='getmerkleproof', Id=self.__id, Version='1.0.0', Hash=tx_hash, Raw=0)112 return await self.__send_recv(msg, is_full)113114 async def get_storage(self, hex_contract_address: str, key: str, is_full: bool = False):115 if self.__id == 0:116 self.__id = self.__generate_ws_id()117 msg = dict(Action='getstorage', Id=self.__id, Version='1.0.0', Hash=hex_contract_address, Key=key)118 return await self.__send_recv(msg, is_full)119120 async def get_smart_contract(self, hex_contract_address: str, is_full: bool = False):121 if self.__id == 0:122 self.__id = self.__generate_ws_id()123 msg = dict(Action='getcontract', Id=self.__id, Version='1.0.0', Hash=hex_contract_address, Raw=0)124 response = await self.__send_recv(msg, is_full=True)125 if is_full:126 return response127 return response['Result']128129 async def get_smart_contract_event_by_tx_hash(self, tx_hash: str, is_full: bool = False) -> dict:130 if self.__id == 0:131 self.__id = self.__generate_ws_id()132 msg = dict(Action='getsmartcodeeventbyhash', Id=self.__id, Version='1.0.0', Hash=tx_hash, Raw=0)133 return await self.__send_recv(msg, is_full)134135 async def get_smart_contract_event_by_height(self, height: int, is_full: bool = False):136 if self.__id == 0:137 self.__id = self.__generate_ws_id()138 msg = dict(Action='getsmartcodeeventbyheight', Id=self.__id, Version='1.0.0', Height=height)139 return await self.__send_recv(msg, is_full)140141 async def get_block_height(self, is_full: bool = False) -> dict:142 if self.__id == 0:143 self.__id = self.__generate_ws_id()144 msg = dict(Action='getblockheight', Id=self.__id, Version='1.0.0')145 return await self.__send_recv(msg, is_full)146147 async def get_block_height_by_tx_hash(self, tx_hash: str, is_full: bool = False):148 if self.__id == 0:149 self.__id = self.__generate_ws_id()150 msg = dict(Action='getblockheightbytxhash', Id=self.__id, Version='1.0.0', Hash=tx_hash)151 return await self.__send_recv(msg, is_full)152153 async def get_block_hash_by_height(self, height: int, is_full: bool = False):154 if self.__id == 0:155 self.__id = self.__generate_ws_id()156 msg = dict(Action='getblockhash', Id=self.__id, Version='1.0.0', Height=height)157 return await self.__send_recv(msg, is_full)158159 async def get_block_by_height(self, height: int, is_full: bool = False) -> dict:160 if self.__id == 0:161 self.__id = self.__generate_ws_id()162 msg = dict(Action='getblockbyheight', Version='1.0.0', Id=self.__id, Raw=0, Height=height)163 return await self.__send_recv(msg, is_full)164165 async def get_block_by_hash(self, block_hash: str, is_full: bool = False) -> dict:166 if self.__id == 0:167 self.__id = self.__generate_ws_id()168 msg = dict(Action='getblockbyhash', Version='1.0.0', Id=self.__id, Hash=block_hash)169 return await self.__send_recv(msg, is_full)170171 async def subscribe(self, contract_address_list: List[str] or str, is_event: bool = False,172 is_json_block: bool = False,173 is_raw_block: bool = False, is_tx_hash: bool = False, is_full: bool = False) -> dict:174 if self.__id == 0:175 self.__id = self.__generate_ws_id()176 if isinstance(contract_address_list, str):177 contract_address_list = [contract_address_list]178 msg = dict(Action='subscribe', Version='1.0.0', Id=self.__id, ConstractsFilter=contract_address_list,179 SubscribeEvent=is_event, SubscribeJsonBlock=is_json_block, SubscribeRawBlock=is_raw_block,180 SubscribeBlockTxHashs=is_tx_hash)181 return await self.__send_recv(msg, is_full)182183 async def recv_subscribe_info(self, is_full: bool = False):184 response = await self.__ws_client.recv()185 response = json.loads(response)186 if is_full:187 return response188 if response['Error'] != 0:189 raise SDKException(ErrorCode.other_error(response.get('Result', '')))190 return response.get('Result', dict())191192 async def send_raw_transaction(self, tx: Transaction, is_full: bool = False):193 tx_data = tx.serialize(is_hex=True)194 msg = dict(Action='sendrawtransaction', Version='1.0.0', Id=self.__id, PreExec='0', Data=tx_data)195 return await self.__send_recv(msg, is_full)196197 async def send_raw_transaction_pre_exec(self, tx: Transaction, is_full: bool = False):198 tx_data = tx.serialize(is_hex=True)199 msg = dict(Action='sendrawtransaction', Version='1.0.0', Id=self.__id, PreExec='1', Data=tx_data)200 return await self.__send_recv(msg, is_full)201202 async def send_neo_vm_transaction_pre_exec(self, contract_address: str or bytes or bytearray,203 signer: Account or None, func: AbiFunction or InvokeFunction,204 is_full: bool = False):205 if isinstance(func, AbiFunction):206 params = BuildParams.serialize_abi_function(func)207 elif isinstance(func, InvokeFunction):208 params = func.create_invoke_code()209 else:210 raise SDKException(ErrorCode.other_error('the type of func is error.'))211 contract_address = ensure_bytearray_contract_address(contract_address)212 tx = NeoVm.make_invoke_transaction(contract_address, params, b'', 0, 0)213 if signer is not None:214 tx.sign_transaction(signer)215 return await self.send_raw_transaction_pre_exec(tx, is_full)216217 async def send_neo_vm_transaction(self, contract_address: str or bytes or bytearray, signer: Account or None,218 payer: Account or None, gas_limit: int, gas_price: int,219 func: AbiFunction or InvokeFunction, is_full: bool = False):220 if isinstance(func, AbiFunction):221 params = BuildParams.serialize_abi_function(func)222 elif isinstance(func, InvokeFunction):223 params = func.create_invoke_code()224 else:225 raise SDKException(ErrorCode.other_error('the type of func is error.'))226 contract_address = ensure_bytearray_contract_address(contract_address)227 params.append(0x67)228 for i in contract_address:229 params.append(i)230 if payer is None:231 raise SDKException(ErrorCode.param_err('payer account is None.'))232 tx = Transaction(0, 0xd1, int(time()), gas_price, gas_limit, payer.get_address_bytes(), params, bytearray(), [])233 tx.sign_transaction(payer)234 if isinstance(signer, Account) and signer.get_address_base58() != payer.get_address_base58():235 tx.add_sign_transaction(signer) ...

Full Screen

Full Screen

aiorestful.py

Source:aiorestful.py Github

copy

Full Screen

1#2# SPDX-License-Identifier: LGPL-3.0-or-later3# Copyright 2019 DNA Dev team4#5"""6Copyright (C) 2018-2019 The ontology Authors7This file is part of The ontology library.8The ontology is free software: you can redistribute it and/or modify9it under the terms of the GNU Lesser General Public License as published by10the Free Software Foundation, either version 3 of the License, or11(at your option) any later version.12The ontology is distributed in the hope that it will be useful,13but WITHOUT ANY WARRANTY; without even the implied warranty of14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15GNU Lesser General Public License for more details.16You should have received a copy of the GNU Lesser General Public License17along with The ontology. If not, see <http://www.gnu.org/licenses/>.18"""19import json20import asyncio21from typing import List22from aiohttp import client_exceptions23from aiohttp.client import ClientSession24from dna.core.transaction import Transaction25from dna.exception.error_code import ErrorCode26from dna.exception.exception import SDKException27from dna.network.restful import Restful, RestfulMethod28class AioRestful(Restful):29 def __init__(self, url: str = '', session: ClientSession = None):30 super().__init__(url)31 self.__session = session32 @property33 def session(self):34 return self.__session35 @session.setter36 def session(self, session: ClientSession):37 if not isinstance(session, ClientSession):38 raise SDKException(ErrorCode.param_error)39 self.__session = session40 async def __post(self, url: str, data: str):41 try:42 if self.__session is None:43 async with ClientSession() as session:44 async with session.post(url, data=data, timeout=10) as response:45 res = json.loads(await response.content.read(-1))46 else:47 async with self.__session.post(url, data=data, timeout=10) as response:48 res = json.loads(await response.content.read(-1))49 if res['Error'] != 0:50 if res['Result'] != '':51 raise SDKException(ErrorCode.other_error(res['Result']))52 else:53 raise SDKException(ErrorCode.other_error(res['Desc']))54 return res55 except (asyncio.TimeoutError, client_exceptions.ClientConnectorError):56 raise SDKException(ErrorCode.connect_timeout(self._url)) from None57 async def __get(self, url):58 try:59 if self.__session is None:60 async with ClientSession() as session:61 async with session.get(url, timeout=10) as response:62 res = json.loads(await response.content.read(-1))63 else:64 async with self.__session.get(url, timeout=10) as response:65 res = json.loads(await response.content.read(-1))66 if res['Error'] != 0:67 if res['Result'] != '':68 raise SDKException(ErrorCode.other_error(res['Result']))69 else:70 raise SDKException(ErrorCode.other_error(res['Desc']))71 return res72 except (asyncio.TimeoutError, client_exceptions.ClientConnectorError):73 raise SDKException(ErrorCode.connect_timeout(self._url)) from None74 async def get_version(self, is_full: bool = False):75 url = RestfulMethod.get_version(self._url)76 response = await self.__get(url)77 if is_full:78 return response79 return response['Result']80 async def get_connection_count(self, is_full: bool = False) -> int:81 url = RestfulMethod.get_connection_count(self._url)82 response = await self.__get(url)83 if is_full:84 return response85 return response['Result']86 async def get_gas_price(self, is_full: bool = False) -> int or dict:87 url = RestfulMethod.get_gas_price(self._url)88 response = await self.__get(url)89 if is_full:90 return response91 return response['Result']['gasprice']92 async def get_network_id(self, is_full: bool = False) -> int or dict:93 url = RestfulMethod.get_network_id(self._url)94 response = await self.__get(url)95 if is_full:96 return response97 return response['Result']98 async def get_block_height(self, is_full: bool = False) -> int or dict:99 url = RestfulMethod.get_block_height(self._url)100 response = await self.__get(url)101 if is_full:102 return response103 return response['Result']104 async def get_block_height_by_tx_hash(self, tx_hash: str, is_full: bool = False):105 url = RestfulMethod.get_block_height_by_tx_hash(self._url, tx_hash)106 response = await self.__get(url)107 if response.get('Result', '') == '':108 raise SDKException(ErrorCode.invalid_tx_hash(tx_hash))109 if is_full:110 return response111 return response['Result']112 async def get_block_count_by_tx_hash(self, tx_hash: str, is_full: bool = False):113 response = await self.get_block_height_by_tx_hash(tx_hash, is_full=True)114 response['Result'] += 1115 if is_full:116 return response117 return response['Result']118 async def get_block_count(self, is_full: bool = False) -> int or dict:119 response = await self.get_block_height(is_full=True)120 response['Result'] += 1121 if is_full:122 return response123 return response['Result']124 async def get_block_by_hash(self, block_hash: str,125 is_full: bool = False) -> int or dict:126 url = RestfulMethod.get_block_by_hash(self._url, block_hash)127 response = await self.__get(url)128 if is_full:129 return response130 return response['Result']131 async def get_block_by_height(self, height: int, is_full: bool = False):132 url = RestfulMethod.get_block_by_height(self._url, height)133 response = await self.__get(url)134 if is_full:135 return response136 return response['Result']137 async def get_balance(self, b58_address: str, is_full: bool = False):138 url = RestfulMethod.get_account_balance(self._url, b58_address)139 response = await self.__get(url)140 response['Result'] = dict((k.upper(), int(v)) for k, v in response.get('Result', dict()).items())141 if is_full:142 return response143 return response['Result']144 async def get_allowance(self, asset: str, b58_from_address: str, b58_to_address: str,145 is_full: bool = False):146 url = RestfulMethod.get_allowance(self._url, asset, b58_from_address, b58_to_address)147 response = await self.__get(url)148 if is_full:149 return response150 return response['Result']151 async def get_contract(self, contract_address: str, is_full: bool = False):152 url = RestfulMethod.get_contract(self._url, contract_address)153 response = await self.__get(url)154 if is_full:155 return response156 return response['Result']157 async def get_contract_event_by_height(self, height: int, is_full: bool = False) -> \158 List[159 dict]:160 url = RestfulMethod.get_contract_event_by_height(self._url, height)161 response = await self.__get(url)162 if is_full:163 return response164 result = response['Result']165 if result == '':166 result = list()167 return result168 async def get_contract_event_by_count(self, count: int, is_full: bool = False) -> \169 List[170 dict]:171 return await self.get_contract_event_by_height(count - 1, is_full)172 async def get_contract_event_by_tx_hash(self, tx_hash: str, is_full: bool = False):173 url = RestfulMethod.get_contract_event_by_tx_hash(self._url, tx_hash)174 response = await self.__get(url)175 if is_full:176 return response177 return response['Result']178 async def get_storage(self, hex_contract_address: str, hex_key: str,179 is_full: bool = False) -> str or dict:180 url = RestfulMethod.get_storage(self._url, hex_contract_address, hex_key)181 response = await self.__get(url)182 if is_full:183 return response184 return response['Result']185 async def get_transaction_by_tx_hash(self, tx_hash: str, is_full: bool = False):186 url = RestfulMethod.get_transaction(self._url, tx_hash)187 response = await self.__get(url)188 if is_full:189 return response190 return response['Result']191 async def send_raw_transaction(self, tx: Transaction, is_full: bool = False):192 hex_tx_data = tx.serialize(is_hex=True)193 data = f'{{"Action":"sendrawtransaction", "Version":"1.0.0","Data":"{hex_tx_data}"}}'194 url = RestfulMethod.send_transaction(self._url)195 response = await self.__post(url, data)196 if is_full:197 return response198 return response['Result']199 async def send_raw_transaction_pre_exec(self, tx: Transaction,200 is_full: bool = False):201 hex_tx_data = tx.serialize(is_hex=True)202 data = f'{{"Action":"sendrawtransaction", "Version":"1.0.0","Data":"{hex_tx_data}"}}'203 url = RestfulMethod.send_transaction_pre_exec(self._url)204 response = await self.__post(url, data)205 if is_full:206 return response207 return response['Result']208 async def get_merkle_proof(self, tx_hash: str, is_full: bool = False):209 url = RestfulMethod.get_merkle_proof(self._url, tx_hash)210 response = await self.__get(url)211 if is_full:212 return response213 return response['Result']214 async def get_memory_pool_tx_count(self, is_full: bool = False):215 url = RestfulMethod.get_mem_pool_tx_count(self._url)216 response = await self.__get(url)217 if is_full:218 return response219 return response['Result']220 async def get_memory_pool_tx_state(self, tx_hash: str, is_full: bool = False) -> \221 List[dict] or dict:222 url = RestfulMethod.get_mem_pool_tx_state(self._url, tx_hash)223 response = await self.__get(url)224 if response.get('Result', '') == '':225 raise SDKException(ErrorCode.invalid_tx_hash(tx_hash))226 if is_full:227 return response...

Full Screen

Full Screen

test_timing.py

Source:test_timing.py Github

copy

Full Screen

1"""Unit tests for timing."""2import time3import datetime4import unittest5from azfilebak.timing import Timing6from tests.loggedtestcase import LoggedTestCase7class TestTiming(LoggedTestCase):8 """Unit tests for class Timing."""9 @staticmethod10 def __recovery_sample_data_sorted():11 return [12 {'start_date':'20180101_010000', 'is_full':True},13 {'start_date':'20180101_010000', 'is_full':True},14 {'start_date':'20180101_010000', 'is_full':True},15 {'start_date':'20180101_011000', 'is_full':False},16 {'start_date':'20180101_012000', 'is_full':False},17 {'start_date':'20180101_013000', 'is_full':False},18 {'start_date':'20180101_014000', 'is_full':True},19 {'start_date':'20180101_014000', 'is_full':True},20 {'start_date':'20180101_014000', 'is_full':True},21 {'start_date':'20180101_015000', 'is_full':False},22 {'start_date':'20180101_020000', 'is_full':False},23 {'start_date':'20180101_021000', 'is_full':False},24 {'start_date':'20180101_021000', 'is_full':False},25 {'start_date':'20180101_022000', 'is_full':True},26 {'start_date':'20180101_022000', 'is_full':True},27 {'start_date':'20180101_022000', 'is_full':True},28 {'start_date':'20180101_023000', 'is_full':False},29 {'start_date':'20180101_024000', 'is_full':False},30 {'start_date':'20180101_025000', 'is_full':False},31 {'start_date':'20180101_030000', 'is_full':True},32 {'start_date':'20180101_030000', 'is_full':True},33 {'start_date':'20180101_030000', 'is_full':True},34 {'start_date':'20180101_031000', 'is_full':False},35 {'start_date':'20180101_032000', 'is_full':False},36 {'start_date':'20180101_032000', 'is_full':False},37 {'start_date':'20180101_033000', 'is_full':False}38 ]39 @staticmethod40 def __recovery_sample_data():41 return [42 {'start_date':'20180101_011000', 'is_full':False},43 {'start_date':'20180101_022000', 'is_full':True},44 {'start_date':'20180101_012000', 'is_full':False},45 {'start_date':'20180101_015000', 'is_full':False},46 {'start_date':'20180101_023000', 'is_full':False},47 {'start_date':'20180101_024000', 'is_full':False},48 {'start_date':'20180101_025000', 'is_full':False},49 {'start_date':'20180101_031000', 'is_full':False},50 {'start_date':'20180101_032000', 'is_full':False},51 {'start_date':'20180101_032000', 'is_full':False},52 {'start_date':'20180101_022000', 'is_full':True},53 {'start_date':'20180101_020000', 'is_full':False},54 {'start_date':'20180101_021000', 'is_full':False},55 {'start_date':'20180101_021000', 'is_full':False},56 {'start_date':'20180101_013000', 'is_full':False},57 {'start_date':'20180101_033000', 'is_full':False},58 {'start_date':'20180101_022000', 'is_full':True},59 {'start_date':'20180101_030000', 'is_full':True},60 {'start_date':'20180101_010000', 'is_full':True},61 {'start_date':'20180101_010000', 'is_full':True},62 {'start_date':'20180101_010000', 'is_full':True},63 {'start_date':'20180101_014000', 'is_full':True},64 {'start_date':'20180101_014000', 'is_full':True},65 {'start_date':'20180101_014000', 'is_full':True},66 {'start_date':'20180101_030000', 'is_full':True},67 {'start_date':'20180101_030000', 'is_full':True}68 ]69 def test_parse(self):70 """Test parse."""71 res = Timing.parse("20180605_215959")72 self.assertEqual(73 time.struct_time(74 (2018, 6, 5, 21, 59, 59, 1, 156, -1)),75 res)76 def test_time_diff(self):77 """Test time_diff."""78 self.assertEqual(79 Timing.time_diff("20180106_120000", "20180106_120010"),80 datetime.timedelta(0, 10))81 self.assertEqual(82 Timing.time_diff("20180106_110000", "20180106_120010"),83 datetime.timedelta(0, 3610))84 def test_sort(self):85 """Test sort."""86 self.assertEqual(87 Timing.sort(['20180110_120000', '20180105_120000', '20180101_120000']),88 ['20180101_120000', '20180105_120000', '20180110_120000'])89 self.assertEqual(90 Timing.sort(91 ['20180105_120000', '20180110_120000', '20180105_120000', '20180101_120000']),92 ['20180101_120000', '20180105_120000', '20180105_120000', '20180110_120000'])93 pick_start_date = lambda x: x["start_date"]94 self.assertEqual(95 Timing.sort(96 times=self.__recovery_sample_data(),97 selector=pick_start_date),98 [99 {'is_full': True, 'start_date': '20180101_010000'},100 {'is_full': True, 'start_date': '20180101_010000'},101 {'is_full': True, 'start_date': '20180101_010000'},102 {'is_full': False, 'start_date': '20180101_011000'},103 {'is_full': False, 'start_date': '20180101_012000'},104 {'is_full': False, 'start_date': '20180101_013000'},105 {'is_full': True, 'start_date': '20180101_014000'},106 {'is_full': True, 'start_date': '20180101_014000'},107 {'is_full': True, 'start_date': '20180101_014000'},108 {'is_full': False, 'start_date': '20180101_015000'},109 {'is_full': False, 'start_date': '20180101_020000'},110 {'is_full': False, 'start_date': '20180101_021000'},111 {'is_full': False, 'start_date': '20180101_021000'},112 {'is_full': True, 'start_date': '20180101_022000'},113 {'is_full': True, 'start_date': '20180101_022000'},114 {'is_full': True, 'start_date': '20180101_022000'},115 {'is_full': False, 'start_date': '20180101_023000'},116 {'is_full': False, 'start_date': '20180101_024000'},117 {'is_full': False, 'start_date': '20180101_025000'},118 {'is_full': True, 'start_date': '20180101_030000'},119 {'is_full': True, 'start_date': '20180101_030000'},120 {'is_full': True, 'start_date': '20180101_030000'},121 {'is_full': False, 'start_date': '20180101_031000'},122 {'is_full': False, 'start_date': '20180101_032000'},123 {'is_full': False, 'start_date': '20180101_032000'},124 {'is_full': False, 'start_date': '20180101_033000'}125 ])126 self.assertEquals(127 map(pick_start_date, Timing.sort(times=self.__recovery_sample_data(), selector=pick_start_date)),128 ['20180101_010000', '20180101_010000', '20180101_010000', '20180101_011000', '20180101_012000', '20180101_013000', '20180101_014000', '20180101_014000', '20180101_014000', '20180101_015000', '20180101_020000', '20180101_021000', '20180101_021000', '20180101_022000', '20180101_022000', '20180101_022000', '20180101_023000', '20180101_024000', '20180101_025000', '20180101_030000', '20180101_030000', '20180101_030000', '20180101_031000', '20180101_032000', '20180101_032000', '20180101_033000'])129if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful