Best Python code snippet using Airtest
api.py
Source:api.py  
...80    if int(device.adb.check_remote_port(ip, port).strip()) != 0:81    The port is already in use82    """83    # kill_master_port(device, master_port)84    device.adb.remove_forward(master_port)85    device.adb.kill_port_process(device.port)86    # :If airplane mode is opened, first need close airplane mode87    if device.airplane_mode_is_open:88        device.adb.close_airplane_mode()89        time.sleep(WAIT_AIRPLANE_MODE_TIME)90    # :Running Proxy server command91    device.adb.running_server(host=device.ip, port=device.port)92    # : wait server running93    time.sleep(2)94    device.initialize_device()95    # waite init device96    time.sleep(1)97    if not device.transfer_port_is_open:98        return99    _forward_tcp_port(device, master_port, device.port)100def _change_ip(master_port: int, cluster_device: Device, change_ip_queue):101    """102    # :Close master port for nginx103    # :Get device info104    # :Check proxy running port105    # :Time to change ip106    # :Open master port for nginx107    """108    time.sleep(IP_SWITCHING_TIME)109    change_ip_queue.put_nowait(cluster_device.device_id)110    cluster_device.adb.remove_forward(master_port)111    cluster_device.adb.kill_port_process(cluster_device.port)112    cluster_device.initialize_device()113    if not cluster_device.airplane_mode_is_open:114        cluster_device.adb.turn_on_airplane_mode()115    cluster_device.adb.close_airplane_mode()116    time.sleep(WAIT_AIRPLANE_MODE_TIME)117    # if cluster_device.transfer_port_is_open is False:118    cluster_device.adb.running_server(119        host=cluster_device.ip,120        port=cluster_device.port121    )122    time.sleep(1)123    if cluster_device.adb.bridge_process().strip():124        _forward_tcp_port(125            device=cluster_device,126            local_port=master_port,127            remote_port=cluster_device.port128        )129    time.sleep(1)130    if not cluster_device.adb.ping_test():131        cluster_device.adb.remove_forward(port=master_port)132    if not cluster_device.adb.check_local_port(master_port):133        cluster_device.adb.remove_forward(port=master_port)134    change_ip_queue.get_nowait()135def _health_check(master_port: int, device: Device, change_ip_queue):136    # 妿æ£å¨åæ¢ IP é£ä¹æ¤è®¾å¤ä¸ä¼ç»è¿å¥åº·æ£æµï¼å¦åä¼åºç°é®é¢137    time.sleep(HEALTH_CHECK_TIME)138    if not change_ip_queue.empty():139        if device.device_id == change_ip_queue.get_nowait():140            change_ip_queue.put_nowait(device.device_id)141            time.sleep(HEALTH_CHECK_TIME)142            return None143        change_ip_queue.put_nowait(device.device_id)144        time.sleep(HEALTH_CHECK_TIME)145    device.initialize_device()146    if not device.transfer_port_is_open:147        device.adb.remove_forward(port=master_port)148    if device.airplane_mode_is_open:149        device.adb.remove_forward(port=master_port)150    if not device.adb.ping_test():151        device.adb.remove_forward(port=master_port)152    return None153class Daemon(object):154    def __init__(self, devices: list):155        self.devices = devices156        self.change_ip_queue = Queue()157    def worker(self, work_func, change_ip_queue):158        while True:159            for index, device in enumerate(self.devices):160                master_port = MASTER_PORT_START + index161                work_func(master_port, device, change_ip_queue)162    def run_forever(self):163        """Running change ip and health check"""164        process = [Process(target=self.worker, args=(_change_ip, self.change_ip_queue))]165        [p.start() for p in process]...m_hierarchy.py
Source:m_hierarchy.py  
1import networkx as nx2import methods.util.write_graph as write_graph3import methods.util.util as util4from .zhenv5.remove_cycle_edges_by_hierarchy_greedy import scc_based_to_remove_cycle_edges_iterately5from .zhenv5.remove_cycle_edges_by_hierarchy_BF import remove_cycle_edges_BF_iterately6from .zhenv5.remove_cycle_edges_by_hierarchy_voting import remove_cycle_edges_heuristic7SUPPORTED_SCORE_METHODS = ["pagerank", "socialagony", "trueskill"]8SUPPORTED_RANKING_METHODS = ["greedy", "forward", "backward", "voting"]9g = nx.DiGraph()10def prepare(line):11    g.add_edge(line[1], line[2])12def do(filename_out, delimiter, mode, gephi_out, filename_in=None):13    inputs = mode.split("_")14    edges_to_remove = None15    if len(inputs) < 2:16        raise Exception(17            "No score method provided (e.g. 'hierarchy_pagerank_voting'). Supported: ensemble, pagerank, socialagony, trueskill")18    if len(inputs) == 2:19        raise Exception(20            "Score method '%s' not supported." % inputs[1])21    if len(inputs) != 2 and (len(inputs) < 3 or inputs[2] not in SUPPORTED_RANKING_METHODS):22        raise Exception("Ranking method '%s' not supported. Supported: %s" % (inputs[2], SUPPORTED_RANKING_METHODS))23    score_name = inputs[1]24    ranking = inputs[2]25    print("Score method: %s" % score_name)26    print("Ranking method: %s" % ranking)27    score_names = SUPPORTED_SCORE_METHODS if score_name == "ensemble" else [score_name]28    votings = []29    for mode in score_names:30        print("--------------")31        print("Starting mode: %s" % mode)32        players_score_dict = computing_hierarchy(filename_in, mode, filename_in)33        edges_to_remove, e1, e2, e3, e4 = compute_ranking(ranking, mode, players_score_dict)34        if e1 is not None:35            votings.append(set(e1))36        if e2 is not None:37            votings.append(set(e2))38        if e3 is not None:39            votings.append(set(e3))40        print("Mode '%s' recommends to remove %s edges." % (mode, len(edges_to_remove)))41    if score_name == "ensemble" and ranking == "voting":42        edges_to_remove = remove_cycle_edges_by_voting(votings)43    print("Remove edges...")44    cycles_removed = util.remove_edges_from_network_graph(g, edges_to_remove)45    write_graph.network_graph(filename_out, g, gephi_out=gephi_out, delimiter=delimiter)46    return cycles_removed47def dir_tail_name(file_name):48    import os.path49    dir_name = os.path.dirname(file_name)50    head, tail = os.path.split(file_name)51    print("dir name: %s, file_name: %s" % (dir_name, tail))52    return dir_name, tail53def get_edges_voting_scores(set_edges_list):54    total_edges = set()55    for edges in set_edges_list:56        total_edges = total_edges | edges57    edges_score = {}58    for e in total_edges:59        edges_score[e] = len(filter(lambda x: e in x, set_edges_list))60    return edges_score61def compute_ranking(ranking, score_name, players_score_dict):62    edges_to_remove, remove_greedy, remove_forward, remove_backward, remove_voting = None, None, None, None, None63    if ranking == "voting" or ranking == "greedy":64        print("Compute edges to remove with ranking 'greedy'.")65        remove_greedy = scc_based_to_remove_cycle_edges_iterately(g.copy(), players_score_dict)66        edges_to_remove = remove_greedy67    if ranking == "voting" or ranking == "forward":68        print("Compute edges to remove with ranking 'forward'.")69        remove_forward = remove_cycle_edges_BF_iterately(g.copy(), players_score_dict, is_Forward=True,70                                                         score_name=score_name)71        edges_to_remove = remove_forward72    if ranking == "voting" or ranking == "backward":73        print("Compute edges to remove with ranking 'backward'.")74        remove_backward = remove_cycle_edges_BF_iterately(g.copy(), players_score_dict, is_Forward=False,75                                                          score_name=score_name)76        edges_to_remove = remove_backward77    if ranking == "voting":78        print("Compute edges to remove with ranking 'voting'.")79        remove_voting = remove_cycle_edges_by_voting([set(remove_greedy), set(remove_forward), set(remove_backward)])80        edges_to_remove = remove_voting81    return edges_to_remove, remove_greedy, remove_forward, remove_backward, remove_voting82def remove_cycle_edges_strategies(graph_file, nodes_score_dict, score_name="socialagony", nodetype=int):83    # greedy84    cg = g.copy()85    e1 = scc_based_to_remove_cycle_edges_iterately(cg, nodes_score_dict)86    # forward87    cg = g.copy()88    e2 = remove_cycle_edges_BF_iterately(cg, nodes_score_dict, is_Forward=True, score_name=score_name)89    # backward90    cg = g.copy()91    e3 = remove_cycle_edges_BF_iterately(cg, nodes_score_dict, is_Forward=False, score_name=score_name)92    return e1, e2, e393def remove_cycle_edges_by_voting(set_edges_list, nodetype=int):94    edges_score = get_edges_voting_scores(set_edges_list)95    e = remove_cycle_edges_heuristic(g.copy(), edges_score, nodetype=nodetype)96    return e97def remove_cycle_edges_by_hierarchy(graph_file, nodes_score_dict, score_name="socialagony"):98    e1, e2, e3 = remove_cycle_edges_strategies(graph_file, nodes_score_dict, score_name=score_name)99    e4 = remove_cycle_edges_by_voting([set(e1), set(e2), set(e3)])100    return e1, e2, e3, e4101def computing_hierarchy(graph_file, players_score_func_name, filename_in=None):102    import os.path103    if players_score_func_name == "socialagony":104        # agony_file = graph_file[:len(graph_file)-6] + "_socialagony.txt"105        # from compute_social_agony import compute_social_agony106        # players = compute_social_agony(graph_file,agony_path = "agony/agony ")107        if False:108            # if os.path.isfile(agony_file):109            print("load pre-computed socialagony from: %s" % agony_file)110            players = read_dict_from_file(agony_file)111        else:112            print("start computing socialagony...")113            from zhenv5.compute_social_agony import compute_social_agony114            players = compute_social_agony(graph_file)115        return players116    if players_score_func_name == "pagerank":117        # print("computing pagerank...")118        players = nx.pagerank(g.copy(), alpha=0.85)119        return players120    elif players_score_func_name == "trueskill":121        output_file = graph_file[:len(graph_file) - 6] + "_trueskill.txt"122        output_file_2 = graph_file[:len(graph_file) - 6] + "_trueskill.pkl"123        # from true_skill import graphbased_trueskill124        # players = graphbased_trueskill(g)125        # from file_io import write_dict_to_file126        # write_dict_to_file(players,output_file)127        '''128        if os.path.isfile(output_file):129            print("load pre-computed trueskill from: %s" % output_file)130            players = read_dict_from_file(output_file,key_type = int, value_type = float)131        elif os.path.isfile(output_file_2):132            print("load pre-computed trueskill from: %s" % output_file_2)133            players = read_from_pickle(output_file_2)			134        '''135        if True:136            print("start computing trueskill...")137            from zhenv5.true_skill import graphbased_trueskill138            players = graphbased_trueskill(g.copy())...example.py
Source:example.py  
...9        for j in browser.list_tab():10            if j._kwargs["url"].find("baidu") != -1:11                socketName = i12                break13        android1.remove_forward(i)14    if socketName:15        android1.forward(socketName)16        browser = pychrome.Browser(url="http://127.0.0.1:{0}".format(str(android1.remoteDebuggingPort)))17        print(browser.list_tab()[0]._kwargs)18        tab = browser.list_tab()[0]19        '''20        you can add code to operate the dom of tab.21            usage like:22                    # start tab23                    tab.start()24                    # call method25                    tab.Network.enable()26                    # close tab27                    tab.stop()28        '''29        android1.remove_forward(socketName)30    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
