How to use handle_rpc method in autotest

Best Python code snippet using autotest_python

marmarachain_rpc.py

Source:marmarachain_rpc.py Github

copy

Full Screen

...257 marmarad_pid = remote_connection.server_execute_command('pidof komodod', ssh_client)258 return marmarad_pid259 except Exception as error:260 logging.error(error)261def handle_rpc(method, params):262 if is_local:263 if method == cp.convertpassphrase or method == cp.importprivkey:264 logging.info('------sending command----- \n ' + method)265 else:266 logging.info('------sending command----- \n ' + method + str(params))267 try:268 result = local_connection.curl_response(method, params)269 return result270 except Exception as error:271 logging.error(error)272 return None, error, 1273 else:274 cmd = set_remote(method)275 cmd = marmara_path + cmd276 for item in params:277 if method == 'convertpassphrase':278 cmd = cmd + ' "' + item + '"'279 elif method == 'getaddressesbyaccount':280 cmd = cmd + ' ""'281 elif method == 'getaddresstxids' or method == 'marmaraissue' or method == 'marmaratransfer' \282 or method == 'marmarareceive' or method == 'getaddressbalance':283 cmd = cmd + ' ' + json.dumps(item)284 elif method == 'setgenerate':285 cmd = cmd + ' ' + str(item).lower()286 else:287 cmd = cmd + ' ' + str(item)288 cmd = cmd.replace('{', "'{").replace('}', "}'")289 if method == cp.convertpassphrase or method == cp.importprivkey:290 logging.info('------sending command----- \n ' + method)291 else:292 logging.info('------sending command----- \n ' + cmd)293 if not ssh_client:294 set_sshclient(remote_connection.server_ssh_connect())295 try:296 result = remote_connection.server_execute_command(cmd, ssh_client)297 return result298 except Exception as error:299 logging.error(error)300 set_sshclient(remote_connection.server_ssh_connect())301 return None, error, 1302class RpcHandler(QtCore.QObject):303 command_out = pyqtSignal(tuple)304 output = pyqtSignal(str)305 walletlist_out = pyqtSignal(list)306 finished = pyqtSignal()307 def __init__(self):308 super(RpcHandler, self).__init__()309 self.method = ""310 self.params = []311 self.command = ""312 @pyqtSlot(str)313 def set_command(self, value):314 self.command = value315 @pyqtSlot(str)316 def set_method(self, method):317 self.method = method318 @pyqtSlot(list)319 def set_params(self, params):320 self.params = params321 @pyqtSlot()322 def do_execute_rpc(self):323 result_out = handle_rpc(self.method, self.params)324 self.command_out.emit(result_out)325 time.sleep(0.1)326 self.finished.emit()327 @pyqtSlot()328 def check_marmara_path(self):329 self.output.emit('get marmarad path')330 path_key = ""331 if is_local:332 path_key = 'local'333 if not is_local:334 path_key = remote_connection.server_hostname335 marmarad_path = configuration.ApplicationConfig().get_value('PATHS', path_key)336 if marmarad_path: # if these is path in configuration337 self.output.emit('marmarad_path= ' + marmarad_path)338 if platform.system() == 'Windows':339 ls_cmd = 'PowerShell ls ' + marmarad_path.replace(' ', '` ') + ' -name'340 else:341 ls_cmd = 'ls ' + marmarad_path342 self.output.emit('verifiying path')343 verify_path = do_search_path(ls_cmd)344 if not verify_path[0] == ['']:345 verify_path_out = str(verify_path[0]).replace('.exe', '')346 if 'komodod' in verify_path_out and 'komodo-cli' in verify_path_out:347 self.output.emit('marmarad found.')348 set_marmara_path(marmarad_path)349 self.finished.emit()350 else:351 self.get_marmarad_path(path_key) # search path for marmarad352 elif verify_path[1]:353 self.get_marmarad_path(path_key) # search path for marmarad354 else:355 self.get_marmarad_path(path_key)356 @pyqtSlot()357 def get_marmarad_path(self, path_key):358 self.output.emit('no path config')359 search_result = search_marmarad_path()360 if search_result:361 self.output.emit('marmarad found.')362 configuration.ApplicationConfig().set_key_value('PATHS', path_key, search_result)363 if marmara_path != search_result:364 set_marmara_path(search_result)365 self.finished.emit()366 else:367 self.output.emit('need to install mcl')368 self.finished.emit()369 @pyqtSlot()370 def is_chain_ready(self):371 i = 0372 while True:373 getinfo = handle_rpc(cp.getinfo, [])374 if getinfo[0]:375 logging.info('----getinfo result ----- \n' + str(getinfo[0]))376 self.command_out.emit(getinfo)377 time.sleep(0.1)378 addresses = self._get_addresses()379 time.sleep(0.1)380 if type(addresses) == list:381 self.walletlist_out.emit(addresses)382 activated_address_list = handle_rpc(cp.marmaralistactivatedaddresses, [])383 time.sleep(0.1)384 self.command_out.emit(activated_address_list)385 getgenerate = handle_rpc(cp.getgenerate, [])386 time.sleep(0.1)387 self.command_out.emit(getgenerate)388 self.finished.emit()389 # print('chain ready')390 else:391 # self.output.emit(addresses)392 logging.warning('could not get addresses')393 self.finished.emit()394 break395 if getinfo[1]:396 self.command_out.emit(getinfo)397 if getinfo[2] == 1:398 i = i + 1399 if i >= 10:400 logging.error('could not start marmarachain')401 getinfo = None, 'could not start marmarachain'402 self.command_out.emit(getinfo)403 self.finished.emit()404 break405 # print('chain is not ready')406 time.sleep(2)407 @pyqtSlot()408 def stopping_chain(self):409 result_out = handle_rpc(cp.stop, [])410 if result_out[0]:411 self.command_out.emit(result_out)412 while True:413 pid = mcl_chain_status()414 if len(pid[0]) == 0:415 self.finished.emit()416 out = 0, None, 0417 self.command_out.emit(out)418 logging.info('chain stopped')419 break420 time.sleep(1)421 elif result_out[1]:422 self.command_out.emit(result_out)423 self.finished.emit()424 def _get_addresses(self):425 addresses = handle_rpc(cp.getaddressesbyaccount, [''])426 if addresses[0]:427 addresses_js = json.loads(addresses[0])428 wallet_list = []429 amount = 0430 pubkey = ""431 for address in addresses_js:432 validation = handle_rpc(cp.validateaddress, [address])433 if validation[0]:434 if json.loads(validation[0])['ismine']:435 pubkey = json.loads(validation[0])['pubkey']436 elif validation[1]:437 logging.info(validation[1])438 address_js = {'addresses': [address]}439 amounts = handle_rpc(cp.getaddressbalance, [address_js])440 if amounts[0]:441 amount = json.loads(amounts[0])['balance']442 amount = str(int(amount) / 100000000)443 elif amounts[1]:444 logging.info('getting error in getbalance :' + str(amounts[1]))445 address_list = [amount, address, pubkey]446 wallet_list.append(address_list)447 return wallet_list448 elif addresses[1]:449 logging.error(addresses[1])450 return False451 @pyqtSlot()452 def get_addresses(self):453 addresses = self._get_addresses()454 if type(addresses) == list:455 self.walletlist_out.emit(addresses)456 else:457 self.walletlist_out.emit([])458 logging.info('could not get addresses')459 self.finished.emit()460 time.sleep(0.2)461 self.finished.emit()462 @pyqtSlot()463 def refresh_sidepanel(self):464 getinfo = handle_rpc(cp.getinfo, [])465 if getinfo[0]:466 self.command_out.emit(getinfo)467 time.sleep(0.1)468 activated_address_list = handle_rpc(cp.marmaralistactivatedaddresses, [])469 self.command_out.emit(activated_address_list)470 self.finished.emit()471 elif getinfo[1]:472 self.command_out.emit(getinfo)473 self.finished.emit()474 @pyqtSlot()475 def setgenerate(self):476 setgenerate = handle_rpc(self.method, self.params)477 if setgenerate[2] == 0 or setgenerate[2] == 200:478 getgenerate = handle_rpc(cp.getgenerate, [])479 time.sleep(0.1)480 self.command_out.emit(getgenerate)481 self.finished.emit()482 else:483 self.finished.emit()484 self.command_out.emit(setgenerate)485 @pyqtSlot()486 def get_balances(self):487 getbalance = handle_rpc(cp.getbalance, [])488 time.sleep(0.1)489 listaddressgroupings = handle_rpc(cp.listaddressgroupings, [])490 time.sleep(0.1)491 activated_address_list = handle_rpc(cp.marmaralistactivatedaddresses, [])492 time.sleep(0.1)493 if (getbalance[2] == 200 and listaddressgroupings[2] == 200 and activated_address_list[2] == 200) \494 or (getbalance[2] == 0 and listaddressgroupings[2] == 0 and activated_address_list[2] == 0):495 result = str(getbalance[0]).replace('\n', ''), json.loads(str(listaddressgroupings[0])), \496 json.loads(str(activated_address_list[0])), 0497 self.command_out.emit(result)498 self.finished.emit()499 else:500 result = getbalance[1], listaddressgroupings[1], activated_address_list[1], 1501 self.command_out.emit(result)502 self.finished.emit()503 @pyqtSlot()504 def txids_detail(self):505 get_txids = handle_rpc(self.method, self.params)506 if get_txids[2] == 200 or get_txids[2] == 0:507 if get_txids[0] or get_txids[0] == []:508 details_list = []509 if len(get_txids[0]) != 0:510 for txid in json.loads(get_txids[0]):511 txid_detail = handle_rpc(cp.gettransaction, [txid])512 if txid_detail[2] == 200 or txid_detail[2] == 0:513 if txid_detail[0]:514 detail = json.loads(txid_detail[0])515 amount = detail.get('amount')516 block_time = detail.get('blocktime')517 txid_list = [txid, amount, block_time]518 details_list.append(txid_list)519 else:520 print(txid_detail[1])521 self.finished.emit()522 break523 result = details_list, 0524 self.command_out.emit(result)525 self.finished.emit()526 else:527 result = details_list, 0528 self.command_out.emit(result)529 self.finished.emit()530 else:531 result_err = get_txids[1], 1532 self.command_out.emit(result_err)533 self.finished.emit()534 @pyqtSlot()535 def extract_bootstrap(self):536 cmd_list = []537 if os.path.exists(self.method + '/blocks'):538 if platform.system() == 'Linux':539 cmd_list.append('rm -r ' + self.method + '/blocks')540 if platform.system() == 'Win64' or platform.system() == 'Windows':541 cmd_list.append('rmdir /s /q ' + self.method + '\blocks')542 if os.path.exists(self.method + '/chainstate'):543 if platform.system() == 'Linux':544 cmd_list.append('rm -r ' + self.method + '/chainstate')545 if platform.system() == 'Win64' or platform.system() == 'Windows':546 cmd_list.append('rmdir /s /q ' + self.method + '\chainstate')547 cmd_list.append(self.command)548 self.do_execute_extract(cmd_list)549 def do_execute_extract(self, cmd_list):550 pwd_home = str(pathlib.Path.home())551 for cmd in cmd_list:552 proc = subprocess.Popen(cmd, cwd=pwd_home, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)553 retvalue = proc.poll()554 while True:555 stdout = proc.stdout.readline().replace(b'\n', b'').replace(b'\r', b'').decode()556 self.output.emit(str(stdout))557 logging.info(str(stdout))558 if not stdout:559 break560 self.output.emit(str(retvalue))561 self.finished.emit()562 def get_loop_detail(self, txid, holder=False):563 loop_detail = handle_rpc(cp.marmaracreditloop, [txid])564 if loop_detail[2] == 200 or loop_detail[2] == 0:565 if loop_detail[0]:566 loop_amount = json.loads(loop_detail[0]).get('amount')567 loop_matures = json.loads(loop_detail[0]).get('matures')568 pubkey = json.loads(loop_detail[0]).get('batonpk')569 creditloop = json.loads(loop_detail[0]).get('creditloop')570 loop_address = json.loads(loop_detail[0]).get('LockedInLoopCCaddr')571 loop_create_block = ""572 for item in creditloop:573 if item.get('funcid') == 'B':574 loop_create_block = item.get('height')575 issuer_pk = item.get('issuerpk')576 if holder:577 pubkey = issuer_pk578 break579 return [txid, loop_amount, pubkey, loop_matures, loop_create_block, loop_address]580 else:581 logging.error(loop_detail[1])582 return False583 @pyqtSlot()584 def active_loops_details(self):585 marmarainfo = handle_rpc(self.method, self.params)586 if marmarainfo[2] == 200 or marmarainfo[2] == 0:587 if marmarainfo[0]:588 marmarainfo_result = json.loads(marmarainfo[0])589 issuances_issuer = marmarainfo_result.get('issuances')590 issuer_details_list = []591 for issuance in issuances_issuer:592 issuance_details = self.get_loop_detail(issuance)593 if issuance_details:594 issuer_details_list.append(issuance_details)595 else:596 logging.error('some error in getting loopdetail')597 self.finished.emit()598 break599 result = issuer_details_list, marmarainfo_result, 0600 self.command_out.emit(result)601 self.finished.emit()602 else:603 result_err = None, marmarainfo[1], 1604 self.command_out.emit(result_err)605 self.finished.emit()606 @pyqtSlot()607 def holder_loop_detail(self):608 marmaraholderloops = handle_rpc(self.method, self.params)609 if marmaraholderloops[2] == 200 or marmaraholderloops[2] == 0:610 if marmaraholderloops[0]:611 holer_result = json.loads(marmaraholderloops[0])612 issuances_holder = holer_result.get('issuances')613 holder_details_list = []614 for issuance in issuances_holder:615 issuance_details = self.get_loop_detail(issuance, holder=True)616 if issuance_details:617 holder_details_list.append(issuance_details)618 else:619 logging.error('some error in getting loopdetail')620 self.finished.emit()621 break622 result = holder_details_list, holer_result, 0623 self.command_out.emit(result)624 self.finished.emit()625 else:626 result_err = None, marmaraholderloops[1], 1627 self.command_out.emit(result_err)628 self.finished.emit()629 @pyqtSlot()630 def check_fork_api(self):631 getblock = handle_rpc(self.method, self.params)632 if getblock[2] == 200 or getblock[2] == 0:633 if getblock[0]:634 result_json = json.loads(getblock[0])635 block = result_json.get('height')636 hash = result_json.get('hash')637 previous_hash = result_json.get('previousblockhash')638 # next_hash = result_json.get('nextblockhash')639 result_list = [block, hash, previous_hash]640 blocks_detail_api = api_request.get_blocks_details(block, hash)641 result_out = result_list, blocks_detail_api, 0642 self.command_out.emit(result_out)643 self.finished.emit()644 else:645 result_err = None, getblock[1], 1646 self.command_out.emit(result_err)647 self.finished.emit()648 @pyqtSlot()649 def calc_wallet_earnings(self):650 method_list = [self.method, self.method, cp.listaddressgroupings, cp.marmaralistactivatedaddresses]651 param_list = [[str(self.params[0])], [str(self.params[1])], [], []]652 normaladdresseslist = []653 activatedaddresslist = []654 days = []655 begin_day = None656 end_day = None657 index = 0658 for method in method_list:659 result = handle_rpc(method, param_list[index])660 if result[2] == 200 or result[2] == 0:661 if index == 0:662 begin_day = datetime.fromtimestamp(json.loads(result[0]).get('time')).date()663 if index == 1:664 end_day = datetime.fromtimestamp(json.loads(result[0]).get('time')).date()665 if index == 2:666 normaladdresses = json.loads(result[0])667 if len(normaladdresses) > 0:668 normaladdresses = normaladdresses[0]669 else:670 normaladdresses = normaladdresses671 for item in normaladdresses:672 normaladdresseslist.append(item[0])673 if index == 3:674 for item in json.loads(result[0]).get("WalletActivatedAddresses"):675 activatedaddresslist.append(item.get('activatedaddress'))676 else:677 result_err = None, result[1], 1678 self.command_out.emit(result_err)679 self.finished.emit()680 if index < 3:681 index = index + 1682 days.append(begin_day)683 day_delta = (end_day - begin_day).days684 count = 0685 while day_delta != count:686 days.append(days[count] + timedelta(days=1))687 count = count + 1688 normaladdresstxidslist = []689 activatedaddresstxidslist = []690 self.output.emit('normal txids')691 for naddress in normaladdresseslist:692 normaladdresstxids = handle_rpc(cp.getaddresstxids, [{'addresses': [naddress], 'start': self.params[0],693 'end': self.params[1]}])[0]694 if normaladdresstxids:695 for txid in json.loads(normaladdresstxids):696 normaladdresstxidslist.append(txid)697 self.output.emit('activated txids')698 for activatedaddress in activatedaddresslist:699 activatedaddresstxids = handle_rpc(cp.getaddresstxids, [{'addresses': [activatedaddress],700 'start': self.params[0],701 'end': self.params[1]}, 0])[0]702 if activatedaddresstxids:703 for txid in json.loads(activatedaddresstxids):704 activatedaddresstxidslist.append(txid)705 na_earninglist = {}706 aa_earninglist = {}707 self.output.emit('calculating earnings')708 for daytime in days:709 na_earninglist.__setitem__(daytime, 0)710 aa_earninglist.__setitem__(daytime, 0)711 for txid in normaladdresstxidslist:712 coinbase = self.get_coinbase(txid)713 if coinbase[0]: # coinbase[0]= timestamp, coinbase[1] = amount714 prv_value = na_earninglist.__getitem__(coinbase[0])715 na_earninglist.__setitem__(coinbase[0], (prv_value + coinbase[1]))716 for txid in activatedaddresstxidslist:717 coinbase = self.get_coinbase(txid)718 if coinbase[0]:719 prv_value = aa_earninglist.__getitem__(coinbase[0])720 aa_earninglist.__setitem__(coinbase[0], (prv_value + coinbase[1]))721 earningresult = na_earninglist, aa_earninglist, 0722 self.command_out.emit(earningresult)723 self.finished.emit()724 def get_coinbase(self, txid):725 rawtxid = handle_rpc(cp.getrawtransaction, [txid])726 if rawtxid[2] == 200 or rawtxid[2] == 0:727 if is_local:728 rawtx = json.loads(rawtxid[0])729 if not is_local:730 rawtx = rawtxid[0]731 decodedtx = handle_rpc(cp.decoderawtransaction, [str(rawtx)])732 if decodedtx[2] == 200 or decodedtx[2] == 0:733 txdetail = json.loads(decodedtx[0])734 for vin in txdetail.get('vin'):735 if vin.get('coinbase'):736 timestmp = datetime.fromtimestamp(txdetail.get('locktime')).date()737 amount = txdetail.get('vout')[0].get('value')738 return timestmp, amount739 else:740 return None, None741 else:742 return None, None743 else:744 return None, None745class Autoinstall(QtCore.QObject):...

Full Screen

Full Screen

registration.py

Source:registration.py Github

copy

Full Screen

...4import iterm2.rpc5import json6import traceback7import websockets8async def generic_handle_rpc(coro, connection, notif):9 rpc_notif = notif.server_originated_rpc_notification10 params = {}11 ok = False12 try:13 for arg in rpc_notif.rpc.arguments:14 name = arg.name15 if arg.HasField("json_value"):16 # NOTE: This can throw an exception if there are control characters or other nasties.17 value = json.loads(arg.json_value)18 params[name] = value19 else:20 params[name] = None21 result = await coro(**params)22 ok = True23 except KeyboardInterrupt as e:24 raise e25 except websockets.exceptions.ConnectionClosed as e:26 raise e27 except Exception as e:28 tb = traceback.format_exc()29 exception = { "reason": repr(e), "traceback": tb }30 await iterm2.rpc.async_send_rpc_result(connection, rpc_notif.request_id, True, exception)31 if ok:32 await iterm2.rpc.async_send_rpc_result(connection, rpc_notif.request_id, False, result)33class Registration:34 @staticmethod35 async def async_register_rpc_handler(connection, name, coro, timeout=None, defaults={}):36 """Register a script-defined RPC.37 iTerm2 may be instructed to invoke a script-registered RPC, such as38 through a key binding. Use this method to register one.39 :param name: The RPC name. Combined with its arguments, this must be unique among all registered RPCs. It should consist of letters, numbers, and underscores and must begin with a letter.40 :param coro: An async function. Its arguments are reflected upon to determine the RPC's signature. Only the names of the arguments are used. All arguments should be keyword arguments as any may be omitted at call time.41 :param timeout: How long iTerm2 should wait before giving up on this function's ever returning. `None` means to use the default timeout.42 :param defaults: Gives default values. Names correspond to argument names in `arguments`. Values are in-scope variables at the callsite.43 """44 args = inspect.signature(coro).parameters.keys()45 async def handle_rpc(connection, notif):46 await generic_handle_rpc(coro, connection, notif)47 await iterm2.notifications.async_subscribe_to_server_originated_rpc_notification(connection, handle_rpc, name, args, timeout, defaults, iterm2.notifications.RPC_ROLE_GENERIC)48 @staticmethod49 async def async_register_session_title_provider(connection, name, coro, display_name, timeout=None, defaults={}):50 """Register a script-defined RPC.51 iTerm2 may be instructed to invoke a script-registered RPC, such as52 through a key binding. Use this method to register one.53 :param name: The RPC name. Combined with its arguments, this must be unique among all registered RPCs. It should consist of letters, numbers, and underscores and must begin with a letter.54 :param coro: An async function. Its arguments are reflected upon to determine the RPC's signature. Only the names of the arguments are used. All arguments should be keyword arguments as any may be omitted at call time.55 :param display_name: Gives the name of the function to show in preferences.56 :param timeout: How long iTerm2 should wait before giving up on this function's ever returning. `None` means to use the default timeout.57 :param defaults: Gives default values. Names correspond to argument names in `arguments`. Values are in-scope variables at the callsite.58 """59 async def handle_rpc(connection, notif):60 await generic_handle_rpc(coro, connection, notif)61 args = inspect.signature(coro).parameters.keys()62 await iterm2.notifications.async_subscribe_to_server_originated_rpc_notification(connection, handle_rpc, name, args, timeout, defaults, iterm2.notifications.RPC_ROLE_SESSION_TITLE, display_name)63 @staticmethod64 async def async_register_status_bar_component(connection, component, coro, timeout=None, defaults={}):65 """Registers a status bar component.66 :param component: A :class:`StatusBarComponent`.67 :param coro: An async function. Its arguments are reflected upon to determine the RPC's signature. Only the names of the arguments are used. All arguments should be keyword arguments as any may be omitted at call time. It should take a special argument named "knobs" that is a dictionary with configuration settings. It may return a string or a list of strings. If it returns a list of strings then the longest one that fits will be used.68 :param timeout: How long iTerm2 should wait before giving up on this function's ever returning. `None` means to use the default timeout.69 :param defaults: Gives default values. Names correspond to argument names in `arguments`. Values are in-scope variables of the session owning the status bar.70 """71 async def coro_wrapper(**kwargs):72 if "knobs" in kwargs:73 knobs_json = kwargs["knobs"]74 kwargs["knobs"] = json.loads(knobs_json)75 return await coro(**kwargs)76 async def handle_rpc(connection, notif):77 await generic_handle_rpc(coro_wrapper, connection, notif)78 args = inspect.signature(coro).parameters.keys()79 await iterm2.notifications.async_subscribe_to_server_originated_rpc_notification(80 connection,81 handle_rpc,82 component.name,83 args,84 timeout,85 defaults,86 iterm2.notifications.RPC_ROLE_STATUS_BAR_COMPONENT,87 None,...

Full Screen

Full Screen

handleBookEvent.py

Source:handleBookEvent.py Github

copy

Full Screen

2from helpers.transport import MessageBus3from schemas.book import Book, BookData4from repos.book import BookRepo5def handle_event(mb: MessageBus, book_repo: BookRepo):6 @mb.handle_rpc('find_book')7 def find_book(keyword: str, limit: int, offset: int) -> List[Mapping[str, Any]]:8 results = book_repo.find(keyword, limit, offset)9 return [result.dict() for result in results]10 @mb.handle_rpc('find_book_by_id')11 def find_book_by_id(id: str) -> Mapping[str, Any]:12 result = book_repo.find_by_id(id)13 if result is None:14 return None15 return result.dict()16 @mb.handle_rpc('insert_book')17 def insert_book(data: Mapping[str, Any]) -> Mapping[str, Any]:18 result = book_repo.insert(BookData.parse_obj(data))19 if result is None:20 return None21 return result.dict()22 @mb.handle_rpc('update_book')23 def update_book(id: str, data: Mapping[str, Any]) -> Mapping[str, Any]:24 result = book_repo.update(id, BookData.parse_obj(data))25 if result is None:26 return None27 return result.dict()28 @mb.handle_rpc('delete_book')29 def delete_book(id: str) -> Mapping[str, Any]:30 result = book_repo.delete(id)31 if result is None:32 return None33 return result.dict()34 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful