How to use remove_forward method in Airtest

Best Python code snippet using Airtest

api.py

Source:api.py Github

copy

Full Screen

...80 if int(device.adb.check_remote_port(ip, port).strip()) != 0:81 The port is already in use82 """83 # kill_master_port(device, master_port)84 device.adb.remove_forward(master_port)85 device.adb.kill_port_process(device.port)86 # :If airplane mode is opened, first need close airplane mode87 if device.airplane_mode_is_open:88 device.adb.close_airplane_mode()89 time.sleep(WAIT_AIRPLANE_MODE_TIME)90 # :Running Proxy server command91 device.adb.running_server(host=device.ip, port=device.port)92 # : wait server running93 time.sleep(2)94 device.initialize_device()95 # waite init device96 time.sleep(1)97 if not device.transfer_port_is_open:98 return99 _forward_tcp_port(device, master_port, device.port)100def _change_ip(master_port: int, cluster_device: Device, change_ip_queue):101 """102 # :Close master port for nginx103 # :Get device info104 # :Check proxy running port105 # :Time to change ip106 # :Open master port for nginx107 """108 time.sleep(IP_SWITCHING_TIME)109 change_ip_queue.put_nowait(cluster_device.device_id)110 cluster_device.adb.remove_forward(master_port)111 cluster_device.adb.kill_port_process(cluster_device.port)112 cluster_device.initialize_device()113 if not cluster_device.airplane_mode_is_open:114 cluster_device.adb.turn_on_airplane_mode()115 cluster_device.adb.close_airplane_mode()116 time.sleep(WAIT_AIRPLANE_MODE_TIME)117 # if cluster_device.transfer_port_is_open is False:118 cluster_device.adb.running_server(119 host=cluster_device.ip,120 port=cluster_device.port121 )122 time.sleep(1)123 if cluster_device.adb.bridge_process().strip():124 _forward_tcp_port(125 device=cluster_device,126 local_port=master_port,127 remote_port=cluster_device.port128 )129 time.sleep(1)130 if not cluster_device.adb.ping_test():131 cluster_device.adb.remove_forward(port=master_port)132 if not cluster_device.adb.check_local_port(master_port):133 cluster_device.adb.remove_forward(port=master_port)134 change_ip_queue.get_nowait()135def _health_check(master_port: int, device: Device, change_ip_queue):136 # 如果正在切换 IP 那么此设备不会经过健康检测,否则会出现问题137 time.sleep(HEALTH_CHECK_TIME)138 if not change_ip_queue.empty():139 if device.device_id == change_ip_queue.get_nowait():140 change_ip_queue.put_nowait(device.device_id)141 time.sleep(HEALTH_CHECK_TIME)142 return None143 change_ip_queue.put_nowait(device.device_id)144 time.sleep(HEALTH_CHECK_TIME)145 device.initialize_device()146 if not device.transfer_port_is_open:147 device.adb.remove_forward(port=master_port)148 if device.airplane_mode_is_open:149 device.adb.remove_forward(port=master_port)150 if not device.adb.ping_test():151 device.adb.remove_forward(port=master_port)152 return None153class Daemon(object):154 def __init__(self, devices: list):155 self.devices = devices156 self.change_ip_queue = Queue()157 def worker(self, work_func, change_ip_queue):158 while True:159 for index, device in enumerate(self.devices):160 master_port = MASTER_PORT_START + index161 work_func(master_port, device, change_ip_queue)162 def run_forever(self):163 """Running change ip and health check"""164 process = [Process(target=self.worker, args=(_change_ip, self.change_ip_queue))]165 [p.start() for p in process]...

Full Screen

Full Screen

m_hierarchy.py

Source:m_hierarchy.py Github

copy

Full Screen

1import networkx as nx2import methods.util.write_graph as write_graph3import methods.util.util as util4from .zhenv5.remove_cycle_edges_by_hierarchy_greedy import scc_based_to_remove_cycle_edges_iterately5from .zhenv5.remove_cycle_edges_by_hierarchy_BF import remove_cycle_edges_BF_iterately6from .zhenv5.remove_cycle_edges_by_hierarchy_voting import remove_cycle_edges_heuristic7SUPPORTED_SCORE_METHODS = ["pagerank", "socialagony", "trueskill"]8SUPPORTED_RANKING_METHODS = ["greedy", "forward", "backward", "voting"]9g = nx.DiGraph()10def prepare(line):11 g.add_edge(line[1], line[2])12def do(filename_out, delimiter, mode, gephi_out, filename_in=None):13 inputs = mode.split("_")14 edges_to_remove = None15 if len(inputs) < 2:16 raise Exception(17 "No score method provided (e.g. 'hierarchy_pagerank_voting'). Supported: ensemble, pagerank, socialagony, trueskill")18 if len(inputs) == 2:19 raise Exception(20 "Score method '%s' not supported." % inputs[1])21 if len(inputs) != 2 and (len(inputs) < 3 or inputs[2] not in SUPPORTED_RANKING_METHODS):22 raise Exception("Ranking method '%s' not supported. Supported: %s" % (inputs[2], SUPPORTED_RANKING_METHODS))23 score_name = inputs[1]24 ranking = inputs[2]25 print("Score method: %s" % score_name)26 print("Ranking method: %s" % ranking)27 score_names = SUPPORTED_SCORE_METHODS if score_name == "ensemble" else [score_name]28 votings = []29 for mode in score_names:30 print("--------------")31 print("Starting mode: %s" % mode)32 players_score_dict = computing_hierarchy(filename_in, mode, filename_in)33 edges_to_remove, e1, e2, e3, e4 = compute_ranking(ranking, mode, players_score_dict)34 if e1 is not None:35 votings.append(set(e1))36 if e2 is not None:37 votings.append(set(e2))38 if e3 is not None:39 votings.append(set(e3))40 print("Mode '%s' recommends to remove %s edges." % (mode, len(edges_to_remove)))41 if score_name == "ensemble" and ranking == "voting":42 edges_to_remove = remove_cycle_edges_by_voting(votings)43 print("Remove edges...")44 cycles_removed = util.remove_edges_from_network_graph(g, edges_to_remove)45 write_graph.network_graph(filename_out, g, gephi_out=gephi_out, delimiter=delimiter)46 return cycles_removed47def dir_tail_name(file_name):48 import os.path49 dir_name = os.path.dirname(file_name)50 head, tail = os.path.split(file_name)51 print("dir name: %s, file_name: %s" % (dir_name, tail))52 return dir_name, tail53def get_edges_voting_scores(set_edges_list):54 total_edges = set()55 for edges in set_edges_list:56 total_edges = total_edges | edges57 edges_score = {}58 for e in total_edges:59 edges_score[e] = len(filter(lambda x: e in x, set_edges_list))60 return edges_score61def compute_ranking(ranking, score_name, players_score_dict):62 edges_to_remove, remove_greedy, remove_forward, remove_backward, remove_voting = None, None, None, None, None63 if ranking == "voting" or ranking == "greedy":64 print("Compute edges to remove with ranking 'greedy'.")65 remove_greedy = scc_based_to_remove_cycle_edges_iterately(g.copy(), players_score_dict)66 edges_to_remove = remove_greedy67 if ranking == "voting" or ranking == "forward":68 print("Compute edges to remove with ranking 'forward'.")69 remove_forward = remove_cycle_edges_BF_iterately(g.copy(), players_score_dict, is_Forward=True,70 score_name=score_name)71 edges_to_remove = remove_forward72 if ranking == "voting" or ranking == "backward":73 print("Compute edges to remove with ranking 'backward'.")74 remove_backward = remove_cycle_edges_BF_iterately(g.copy(), players_score_dict, is_Forward=False,75 score_name=score_name)76 edges_to_remove = remove_backward77 if ranking == "voting":78 print("Compute edges to remove with ranking 'voting'.")79 remove_voting = remove_cycle_edges_by_voting([set(remove_greedy), set(remove_forward), set(remove_backward)])80 edges_to_remove = remove_voting81 return edges_to_remove, remove_greedy, remove_forward, remove_backward, remove_voting82def remove_cycle_edges_strategies(graph_file, nodes_score_dict, score_name="socialagony", nodetype=int):83 # greedy84 cg = g.copy()85 e1 = scc_based_to_remove_cycle_edges_iterately(cg, nodes_score_dict)86 # forward87 cg = g.copy()88 e2 = remove_cycle_edges_BF_iterately(cg, nodes_score_dict, is_Forward=True, score_name=score_name)89 # backward90 cg = g.copy()91 e3 = remove_cycle_edges_BF_iterately(cg, nodes_score_dict, is_Forward=False, score_name=score_name)92 return e1, e2, e393def remove_cycle_edges_by_voting(set_edges_list, nodetype=int):94 edges_score = get_edges_voting_scores(set_edges_list)95 e = remove_cycle_edges_heuristic(g.copy(), edges_score, nodetype=nodetype)96 return e97def remove_cycle_edges_by_hierarchy(graph_file, nodes_score_dict, score_name="socialagony"):98 e1, e2, e3 = remove_cycle_edges_strategies(graph_file, nodes_score_dict, score_name=score_name)99 e4 = remove_cycle_edges_by_voting([set(e1), set(e2), set(e3)])100 return e1, e2, e3, e4101def computing_hierarchy(graph_file, players_score_func_name, filename_in=None):102 import os.path103 if players_score_func_name == "socialagony":104 # agony_file = graph_file[:len(graph_file)-6] + "_socialagony.txt"105 # from compute_social_agony import compute_social_agony106 # players = compute_social_agony(graph_file,agony_path = "agony/agony ")107 if False:108 # if os.path.isfile(agony_file):109 print("load pre-computed socialagony from: %s" % agony_file)110 players = read_dict_from_file(agony_file)111 else:112 print("start computing socialagony...")113 from zhenv5.compute_social_agony import compute_social_agony114 players = compute_social_agony(graph_file)115 return players116 if players_score_func_name == "pagerank":117 # print("computing pagerank...")118 players = nx.pagerank(g.copy(), alpha=0.85)119 return players120 elif players_score_func_name == "trueskill":121 output_file = graph_file[:len(graph_file) - 6] + "_trueskill.txt"122 output_file_2 = graph_file[:len(graph_file) - 6] + "_trueskill.pkl"123 # from true_skill import graphbased_trueskill124 # players = graphbased_trueskill(g)125 # from file_io import write_dict_to_file126 # write_dict_to_file(players,output_file)127 '''128 if os.path.isfile(output_file):129 print("load pre-computed trueskill from: %s" % output_file)130 players = read_dict_from_file(output_file,key_type = int, value_type = float)131 elif os.path.isfile(output_file_2):132 print("load pre-computed trueskill from: %s" % output_file_2)133 players = read_from_pickle(output_file_2) 134 '''135 if True:136 print("start computing trueskill...")137 from zhenv5.true_skill import graphbased_trueskill138 players = graphbased_trueskill(g.copy())...

Full Screen

Full Screen

example.py

Source:example.py Github

copy

Full Screen

...9 for j in browser.list_tab():10 if j._kwargs["url"].find("baidu") != -1:11 socketName = i12 break13 android1.remove_forward(i)14 if socketName:15 android1.forward(socketName)16 browser = pychrome.Browser(url="http://127.0.0.1:{0}".format(str(android1.remoteDebuggingPort)))17 print(browser.list_tab()[0]._kwargs)18 tab = browser.list_tab()[0]19 '''20 you can add code to operate the dom of tab.21 usage like:22 # start tab23 tab.start()24 # call method25 tab.Network.enable()26 # close tab27 tab.stop()28 '''29 android1.remove_forward(socketName)30 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful