How to use ip method in Airtest

Best Python code snippet using Airtest

google_ip.py

Source:google_ip.py Github

copy

Full Screen

...84        self.to_check_ip_queue = Queue.Queue()85        self.scan_exist_ip_queue = Queue.Queue()86        self.ip_lock.release()87        self.load_config()88        self.load_ip()89        #if check_local_network.network_stat == "OK" and not config.USE_IPV6:90        #    self.start_scan_all_exist_ip()91        self.search_more_google_ip()92    def is_ip_enough(self):93        if len(self.gws_ip_list) >= self.max_good_ip_num:94            return True95        else:96            return False97    def load_config(self):98        if config.USE_IPV6:99            good_ip_file_name = "good_ipv6.txt"100            default_good_ip_file_name = "good_ipv6.txt"101            self.max_scan_ip_thread_num = 0102            self.auto_adjust_scan_ip_thread_num = 0103        else:104            good_ip_file_name = "good_ip.txt"105            default_good_ip_file_name = "good_ip.txt"106            self.max_scan_ip_thread_num = config.CONFIG.getint("google_ip", "max_scan_ip_thread_num") #50107            self.auto_adjust_scan_ip_thread_num = config.CONFIG.getint("google_ip", "auto_adjust_scan_ip_thread_num")108        self.good_ip_file = os.path.abspath( os.path.join(config.DATA_PATH, good_ip_file_name))109        self.default_good_ip_file = os.path.join(current_path, default_good_ip_file_name)110        self.scan_ip_thread_num = self.max_scan_ip_thread_num111        self.max_good_ip_num = config.CONFIG.getint("google_ip", "max_good_ip_num") #3000  # stop scan ip when enough112        self.ip_connect_interval = config.CONFIG.getint("google_ip", "ip_connect_interval") #5,10113    def load_ip(self):114        if os.path.isfile(self.good_ip_file):115            file_path = self.good_ip_file116        else:117            file_path = self.default_good_ip_file118        with open(file_path, "r") as fd:119            lines = fd.readlines()120        for line in lines:121            try:122                if line.startswith("#"):123                    continue124                str_l = line.split(' ')125                if len(str_l) < 4:126                    xlog.warning("line err: %s", line)127                    continue128                ip = str_l[0]129                domain = str_l[1]130                server = str_l[2]131                handshake_time = int(str_l[3])132                if len(str_l) > 4:133                    fail_times = int(str_l[4])134                else:135                    fail_times = 0136                if len(str_l) > 5:137                    down_fail = int(str_l[5])138                else:139                    down_fail = 0140                #xlog.info("load ip: %s time:%d domain:%s server:%s", ip, handshake_time, domain, server)141                self.add_ip(ip, handshake_time, domain, server, fail_times, down_fail)142            except Exception as e:143                xlog.exception("load_ip line:%s err:%s", line, e)144        xlog.info("load google ip_list num:%d, gws num:%d", len(self.ip_dict), len(self.gws_ip_list))145        self.try_sort_gws_ip(force=True)146    def save_ip_list(self, force=False):147        if not force:148            if not self.iplist_need_save:149                return150            if time.time() - self.iplist_saved_time < 10:151                return152        self.iplist_saved_time = time.time()153        try:154            self.ip_lock.acquire()155            ip_dict = sorted(self.ip_dict.items(),  key=lambda x: (x[1]['handshake_time'] + x[1]['fail_times'] * 1000))156            with open(self.good_ip_file, "w") as fd:157                for ip, property in ip_dict:158                    fd.write( "%s %s %s %d %d %d\n" %159                        (ip, property['domain'],160                            property['server'],161                            property['handshake_time'],162                            property['fail_times'],163                            property['down_fail']) )164                fd.flush()165            self.iplist_need_save = False166        except Exception as e:167            xlog.error("save good_ip.txt fail %s", e)168        finally:169            self.ip_lock.release()170    def _ip_rate(self, ip_info):171        return ip_info['handshake_time'] + \172                    (ip_info['fail_times'] * 1000 ) + \173                    (ip_info['down_fail'] * 500 )174    def try_sort_gws_ip(self, force=False):175        if time.time() - self.last_sort_time_for_gws < 10 and not force:176            return177        self.ip_lock.acquire()178        self.last_sort_time_for_gws = time.time()179        try:180            self.good_ip_num = 0181            ip_rate = {}182            for ip in self.ip_dict:183                if 'gws' not in self.ip_dict[ip]['server']:184                    continue185                ip_rate[ip] = self._ip_rate(self.ip_dict[ip])186                if self.ip_dict[ip]['fail_times'] == 0:187                    self.good_ip_num += 1188            ip_time = sorted(ip_rate.items(), key=operator.itemgetter(1))189            self.gws_ip_list = [ip for ip,rate in ip_time]190        except Exception as e:191            xlog.error("try_sort_ip_by_handshake_time:%s", e)192        finally:193            self.ip_lock.release()194        time_cost = (( time.time() - self.last_sort_time_for_gws) * 1000)195        if time_cost > 30:196            xlog.debug("sort ip time:%dms", time_cost) # 5ms for 1000 ip. 70~150ms for 30000 ip.197        self.adjust_scan_thread_num()198    def adjust_scan_thread_num(self, max_scan_ip_thread_num=None):199        if max_scan_ip_thread_num!=None:200            self.max_scan_ip_thread_num = max_scan_ip_thread_num201        if not self.auto_adjust_scan_ip_thread_num:202            scan_ip_thread_num = self.max_scan_ip_thread_num203        elif len(self.gws_ip_list) < 100:204            scan_ip_thread_num = self.max_scan_ip_thread_num205        else:206            try:207                the_100th_ip = self.gws_ip_list[99]208                the_100th_handshake_time = self._ip_rate(self.ip_dict[the_100th_ip])209                scan_ip_thread_num = int( (the_100th_handshake_time - 200)/2 * self.max_scan_ip_thread_num/50 )210            except Exception as e:211                xlog.warn("adjust_scan_thread_num fail:%r", e)212                return213            if scan_ip_thread_num > self.max_scan_ip_thread_num:214                scan_ip_thread_num = self.max_scan_ip_thread_num215            elif scan_ip_thread_num < 0:216                scan_ip_thread_num = 0217        if scan_ip_thread_num != self.scan_ip_thread_num:218            xlog.info("Adjust scan thread num from %d to %d", self.scan_ip_thread_num, scan_ip_thread_num)219            self.scan_ip_thread_num = scan_ip_thread_num220            self.search_more_google_ip()221    def ip_quality(self, num=10):222        try:223            iplist_length = len(self.gws_ip_list)224            ip_th = min(num, iplist_length)225            for i in range(ip_th, 0, -1):226                last_ip = self.gws_ip_list[i]227                if self.ip_dict[last_ip]['fail_times'] > 0:228                    continue229                handshake_time = self.ip_dict[last_ip]['handshake_time']230                return handshake_time231            return 9999232        except:233            return 9999234    def append_ip_history(self, ip, info):235        if config.record_ip_history:236            self.ip_dict[ip]['history'].append([time.time(), info])237    # algorithm to get ip:238    # scan start from fastest ip239    # always use the fastest ip.240    # if the ip is used in 5 seconds, try next ip;241    # if the ip is fail in 60 seconds, try next ip;242    # reset pointer to front every 3 seconds243    def get_gws_ip(self):244        self.try_sort_gws_ip()245        self.ip_lock.acquire()246        try:247            ip_num = len(self.gws_ip_list)248            if ip_num == 0:249                #xlog.warning("no gws ip")250                #time.sleep(10)251                return None252            for i in range(ip_num):253                time_now = time.time()254                if self.gws_ip_pointer >= ip_num:255                    if time_now - self.gws_ip_pointer_reset_time < 1:256                        time.sleep(1)257                        continue258                    else:259                        self.gws_ip_pointer = 0260                        self.gws_ip_pointer_reset_time = time_now261                elif self.gws_ip_pointer > 0 and time_now - self.gws_ip_pointer_reset_time > 3:262                    self.gws_ip_pointer = 0263                    self.gws_ip_pointer_reset_time = time_now264                ip = self.gws_ip_list[self.gws_ip_pointer]265                get_time = self.ip_dict[ip]["get_time"]266                if time_now - get_time < self.ip_connect_interval:267                    self.gws_ip_pointer += 1268                    continue269                if time_now - self.ip_dict[ip]['success_time'] > 300: # 5 min270                    fail_connect_interval = 1800 # 30 min271                else:272                    fail_connect_interval = 120 # 2 min273                fail_time = self.ip_dict[ip]["fail_time"]274                if time_now - fail_time < fail_connect_interval:275                    self.gws_ip_pointer += 1276                    continue277                down_fail_connect_interval = 600278                down_fail_time = self.ip_dict[ip]["down_fail_time"]279                if time_now - down_fail_time < down_fail_connect_interval:280                    self.gws_ip_pointer += 1281                    continue282                if self.ip_dict[ip]['links'] >= config.max_links_per_ip:283                    self.gws_ip_pointer += 1284                    continue285                handshake_time = self.ip_dict[ip]["handshake_time"]286                xlog.debug("get ip:%s t:%d", ip, handshake_time)287                self.append_ip_history(ip, "get")288                self.ip_dict[ip]['get_time'] = time_now289                self.ip_dict[ip]['links'] += 1290                self.gws_ip_pointer += 1291                return ip292        except Exception as e:293            xlog.exception("get_gws_ip fail:%r", e)294        finally:295            self.ip_lock.release()296    def add_ip(self, ip, handshake_time, domain=None, server='', fail_times=0, down_fail=0):297        if not isinstance(ip, basestring):298            xlog.error("add_ip input")299            return300        if config.USE_IPV6 and ":" not in ip:301            xlog.warn("add %s but ipv6", ip)302            return303        handshake_time = int(handshake_time)304        self.ip_lock.acquire()305        try:306            if ip in self.ip_dict:307                self.ip_dict[ip]['handshake_time'] = handshake_time308                self.ip_dict[ip]['fail_times'] = fail_times309                if self.ip_dict[ip]['fail_time'] > 0:310                    self.ip_dict[ip]['fail_time'] = 0311                    self.good_ip_num += 1312                self.append_ip_history(ip, handshake_time)313                return False314            self.iplist_need_save = True315            self.good_ip_num += 1316            self.ip_dict[ip] = {'handshake_time':handshake_time, "fail_times":fail_times,317                                    "transfered_data":0, 'data_active':0,318                                    'domain':domain, 'server':server,319                                    "history":[[time.time(), handshake_time]], "fail_time":0,320                                    "success_time":0, "get_time":0, "links":0,321                                    "down_fail":down_fail, "down_fail_time":0}322            if 'gws' in server:323                self.gws_ip_list.append(ip)324            return True325        except Exception as e:326            xlog.exception("add_ip err:%s", e)327        finally:328            self.ip_lock.release()329        return False330    def update_ip(self, ip, handshake_time):331        if not isinstance(ip, basestring):332            xlog.error("set_ip input")333            return334        handshake_time = int(handshake_time)335        if handshake_time < 5: # that's impossible336            xlog.warn("%s handshake:%d impossible", ip, 1000 * handshake_time)337            return338        time_now = time.time()339        check_local_network.report_network_ok()340        check_ip.last_check_time = time_now341        check_ip.continue_fail_count = 0342        self.ip_lock.acquire()343        try:344            if ip in self.ip_dict:345                # Case: some good ip, average handshake time is 300ms346                # some times ip package lost cause handshake time become 2000ms347                # this ip will not return back to good ip front until all become bad348                # There for, prevent handshake time increase too quickly.349                org_time = self.ip_dict[ip]['handshake_time']350                if handshake_time - org_time > 500:351                    self.ip_dict[ip]['handshake_time'] = org_time + 500352                else:353                    self.ip_dict[ip]['handshake_time'] = handshake_time354                self.ip_dict[ip]['success_time'] = time_now355                if self.ip_dict[ip]['fail_times'] > 0:356                    self.good_ip_num += 1357                self.ip_dict[ip]['fail_times'] = 0358                self.append_ip_history(ip, handshake_time)359                self.ip_dict[ip]["fail_time"] = 0360                self.iplist_need_save = True361            #xlog.debug("update ip:%s not exist", ip)362        except Exception as e:363            xlog.error("update_ip err:%s", e)364        finally:365            self.ip_lock.release()366        self.save_ip_list()367    def report_connect_fail(self, ip, force_remove=False):368        self.ip_lock.acquire()369        try:370            time_now = time.time()371            if not ip in self.ip_dict:372                xlog.debug("report_connect_fail %s not exist", ip)373                return374            if force_remove:375                if self.ip_dict[ip]['fail_times'] == 0:376                    self.good_ip_num -= 1377                del self.ip_dict[ip]378                if ip in self.gws_ip_list:379                    self.gws_ip_list.remove(ip)380                xlog.info("remove ip:%s left amount:%d gws_num:%d", ip, len(self.ip_dict), len(self.gws_ip_list))381                return382            self.ip_dict[ip]['links'] -= 1383            # ignore if system network is disconnected.384            if not check_local_network.is_ok():385                xlog.debug("report_connect_fail network fail")386                return387            check_local_network.report_network_fail()388            if not check_local_network.is_ok():389                return390            fail_time = self.ip_dict[ip]["fail_time"]391            if time_now - fail_time < 1:392                xlog.debug("fail time too near %s", ip)393                return394            if self.ip_dict[ip]['fail_times'] == 0:395                self.good_ip_num -= 1396            self.ip_dict[ip]['fail_times'] += 1397            self.append_ip_history(ip, "fail")398            self.ip_dict[ip]["fail_time"] = time_now399            self.to_check_ip_queue.put((ip, time_now + 10))400            xlog.debug("report_connect_fail:%s", ip)401        except Exception as e:402            xlog.exception("report_connect_fail err:%s", e)403        finally:404            self.iplist_need_save = True405            self.ip_lock.release()406        if not self.is_ip_enough():407            self.search_more_google_ip()408    def report_connect_closed(self, ip, reason=""):409        xlog.debug("%s close:%s", ip, reason)410        if reason != "down fail":411            return412        self.ip_lock.acquire()413        try:414            time_now = time.time()415            if not ip in self.ip_dict:416                return417            if self.ip_dict[ip]['down_fail'] == 0:418                self.good_ip_num -= 1419            self.ip_dict[ip]['down_fail'] += 1420            self.append_ip_history(ip, reason)421            self.ip_dict[ip]["down_fail_time"] = time_now422            xlog.debug("ssl_closed %s", ip)423        except Exception as e:424            xlog.error("ssl_closed %s err:%s", ip, e)425        finally:426            self.ip_lock.release()427    def ssl_closed(self, ip, reason=""):428        #xlog.debug("%s ssl_closed:%s", ip, reason)429        self.ip_lock.acquire()430        try:431            if ip in self.ip_dict:432                if self.ip_dict[ip]['links']:433                    self.ip_dict[ip]['links'] -= 1434                    self.append_ip_history(ip, "C[%s]"%reason)435                    xlog.debug("ssl_closed %s", ip)436        except Exception as e:437            xlog.error("ssl_closed %s err:%s", ip, e)438        finally:439            self.ip_lock.release()440    def check_ip_process(self):441        while connect_control.keep_running:442            try:443                ip, test_time = self.to_check_ip_queue.get()444            except:445                continue446            time_wait = test_time - time.time()447            if time_wait > 0:448                time.sleep(time_wait)449            if not check_local_network.is_ok():450                try:451                    if self.ip_dict[ip]['fail_times']:452                        self.ip_dict[ip]['fail_times'] = 0453                        self.good_ip_num += 1454                except:455                    pass456                continue457            result = check_ip.test_gae_ip2(ip)458            if result and result.support_gae:459                self.add_ip(ip, result.handshake_time, result.domain, "gws")460                xlog.debug("restore ip:%s", ip)461                continue462            xlog.debug("ip:%s real fail", ip)463    def remove_slowest_ip(self):464        if len(self.gws_ip_list) <= self.max_good_ip_num:465            return466        self.try_sort_gws_ip(force=True)467        self.ip_lock.acquire()468        try:469            ip_num = len(self.gws_ip_list)470            while ip_num > self.max_good_ip_num:471                ip = self.gws_ip_list[ip_num - 1]472                property = self.ip_dict[ip]473                server = property['server']474                fails = property['fail_times']475                handshake_time = property['handshake_time']476                xlog.info("remove_slowest_ip:%s handshake_time:%d, fails:%d", ip, handshake_time, fails)477                del self.ip_dict[ip]478                if 'gws' in server and ip in self.gws_ip_list:479                    self.gws_ip_list.remove(ip)480                ip_num -= 1481        except Exception as e:482            xlog.exception("remove_slowest_ip err:%s", e)483        finally:484            self.ip_lock.release()485    def recheck_ip(self, ip):486        # recheck ip if not work.487        # can block.488        if not check_local_network.is_ok():489            xlog.debug("recheck_ip:%s network is fail", ip)490            return491        self.report_connect_fail(ip)492        connect_control.start_connect_register()493        result = check_ip.test_gae_ip2(ip)494        connect_control.end_connect_register()495        if not result:496            # connect fail.497            # do nothing498            return499        if not result.support_gae:500            self.report_connect_fail(ip, force_remove=True)501            xlog.debug("recheck_ip:%s real fail, removed.", ip)502        else:503            self.add_ip(ip, result.handshake_time, result.domain, "gws")504            xlog.debug("recheck_ip:%s restore okl", ip)505    def scan_ip_worker(self):506        while self.scan_thread_count <= self.scan_ip_thread_num and connect_control.keep_running:507            if not connect_control.allow_scan():508                time.sleep(10)509                continue510            try:511                time.sleep(1)512                ip = self.ip_range.get_ip()513                if ip in self.ip_dict:514                    continue515                connect_control.start_connect_register()516                result = check_ip.test_gae_ip2(ip)517                connect_control.end_connect_register()518                if not result or not result.support_gae:519                    continue520                if self.add_ip(ip, result.handshake_time, result.domain, "gws"):521                    #xlog.info("add  %s  CN:%s  type:%s  time:%d  gws:%d ", ip,522                    #     result.domain, result.server_type, result.handshake_time, len(self.gws_ip_list))523                    xlog.info("scan_ip add ip:%s time:%d", ip, result.handshake_time)524                    scan_ip_log.info("Add %s time:%d CN:%s ", ip, result.handshake_time, result.domain)525                    self.remove_slowest_ip()526                    self.save_ip_list()527            except Exception as e:528                xlog.exception("google_ip.runJob fail:%r", e)529        self.scan_thread_lock.acquire()530        self.scan_thread_count -= 1531        self.scan_thread_lock.release()532        #xlog.info("scan_ip_worker exit")533    def search_more_google_ip(self):534        if config.USE_IPV6:535            return536        new_thread_num = self.scan_ip_thread_num - self.scan_thread_count537        if new_thread_num < 1:538            return539        for i in range(0, new_thread_num):540            self.scan_thread_lock.acquire()541            self.scan_thread_count += 1542            self.scan_thread_lock.release()543            p = threading.Thread(target = self.scan_ip_worker)544            p.start()545    def scan_all_exist_ip(self):546        max_scan_ip_thread_num = self.max_scan_ip_thread_num547        self.max_scan_ip_thread_num = 0548        self.adjust_scan_thread_num()549        for ip in self.ip_dict:550            self.scan_exist_ip_queue.put(ip)551        xlog.debug("start scan all exist ip, num:%d", self.scan_exist_ip_queue.qsize())552        self.keep_scan_all_exist_ip = True553        scan_threads = []554        for i in range(0, 50):555            th = threading.Thread(target=self.scan_exist_ip_worker, )556            th.start()557            scan_threads.append(th)558        for th in scan_threads:559            th.join()560        self.try_sort_gws_ip()561        xlog.debug("finished scan all exist ip")562        self.max_scan_ip_thread_num = max_scan_ip_thread_num563        self.adjust_scan_thread_num()564        self.scan_all_ip_thread = None565    def start_scan_all_exist_ip(self):566        if hasattr(self, "scan_all_ip_thread") and self.scan_all_ip_thread:567            xlog.warn("scan all exist ip is running")568            return569        self.scan_all_ip_thread = threading.Thread(target=self.scan_all_exist_ip)570        self.scan_all_ip_thread.start()571    def stop_scan_all_exist_ip(self):572        self.keep_scan_all_exist_ip = False573        self.scan_exist_ip_queue = Queue.Queue()574    def scan_exist_ip_worker(self):575        while connect_control.keep_running and self.keep_scan_all_exist_ip:576            try:577                ip = self.scan_exist_ip_queue.get_nowait()578            except:579                break580            connect_control.start_connect_register()581            result = check_ip.test_gae_ip2(ip)582            connect_control.end_connect_register()583            if not result:584                self.ip_lock.acquire()585                try:586                    if ip not in self.ip_dict:587                        continue588                    if self.ip_dict[ip]['fail_times'] == 0:589                        self.good_ip_num -= 1590                    self.ip_dict[ip]['fail_times'] += 1591                    self.ip_dict[ip]["fail_time"] = time.time()592                finally:593                    self.ip_lock.release()594            elif result.support_gae:595                self.add_ip(ip, result.handshake_time, result.domain, "gws")596            else:597                self.report_connect_fail(ip, force_remove=True)598google_ip = IpManager()599if __name__ == "__main__":600    google_ip.scan_all_exist_ip()601    while True:...

Full Screen

Full Screen

generate_ip_range.py

Source:generate_ip_range.py Github

copy

Full Screen

...50        for line in ips:51            if len(line) == 0:52                #print "non line:", line53                continue54            begin, end = ip_utils.split_ip(line)55            if ip_utils.check_ip_valid(begin) == 0 or ip_utils.check_ip_valid(end) == 0:56                PRINT("ip format is error,line:%s, begin: %s,end: %s" % (line, begin, end))57                continue58            nbegin = ip_utils.ip_string_to_num(begin)59            nend = ip_utils.ip_string_to_num(end)60            ip_range_list.append([nbegin,nend])61            #print begin, end62    ip_range_list.sort()63    return ip_range_list64def merge_range(input_ip_range_list):65    output_ip_range_list = []66    range_num = len(input_ip_range_list)67    last_begin = input_ip_range_list[0][0]68    last_end = input_ip_range_list[0][1]69    for i in range(1,range_num):70        ip_range = input_ip_range_list[i]71        begin = ip_range[0]72        end = ip_range[1]73        #print "now:",ip_utils.ip_num_to_string(begin), ip_utils.ip_num_to_string(end)74        if begin > last_end + 2:75            #print "add:",ip_utils.ip_num_to_string(begin), ip_utils.ip_num_to_string(end)76            output_ip_range_list.append([last_begin, last_end])77            last_begin = begin78            last_end = end79        else:80            print "merge:", ip_utils.ip_num_to_string(last_begin), ip_utils.ip_num_to_string(last_end), ip_utils.ip_num_to_string(begin), ip_utils.ip_num_to_string(end)81            if end > last_end:82                last_end = end83    output_ip_range_list.append([last_begin, last_end])84    return output_ip_range_list85def filter_ip_range(good_range, bad_range):86    out_good_range = []87    bad_i = 088    bad_begin, bad_end = bad_range[bad_i]89    for good_begin, good_end in good_range:90        while True:91            if good_begin > good_end:92                PRINT("bad good ip range when filter:%s-%s"  % (ip_utils.ip_num_to_string(good_begin), ip_utils.ip_num_to_string(good_end)))93                assert(good_begin < good_end)94            if good_end < bad_begin:95                # case:96                #     [  good  ]97                #                   [  bad  ]98                out_good_range.append([good_begin, good_end])99                break100            elif bad_end < good_begin:101                # case:102                #                   [  good  ]103                #     [   bad   ]104                bad_i += 1105                bad_begin, bad_end = bad_range[bad_i]106                continue107            elif good_begin <= bad_begin and good_end <= bad_end:108                # case:109                #     [   good    ]110                #           [   bad   ]111                PRINT("cut bad ip case 1:%s - %s" % (ip_utils.ip_num_to_string(bad_begin), ip_utils.ip_num_to_string(good_end)))112                if bad_begin - 1 > good_begin:113                    out_good_range.append([good_begin, bad_begin - 1])114                break115            elif good_begin >= bad_begin and good_end == bad_end:116                # case:117                #           [   good   ]118                #     [      bad       ]119                PRINT("cut bad ip case 2:%s - %s" % (ip_utils.ip_num_to_string(good_begin), ip_utils.ip_num_to_string(bad_end)))120                bad_i += 1121                bad_begin, bad_end = bad_range[bad_i]122                break123            elif good_begin >= bad_begin and good_end > bad_end:124                # case:125                #           [   good   ]126                #     [    bad  ]127                PRINT("cut bad ip case 3:%s - %s" % (ip_utils.ip_num_to_string(good_begin), ip_utils.ip_num_to_string(bad_end)))128                good_begin = bad_end + 1129                bad_i += 1130                bad_begin, bad_end = bad_range[bad_i]131                continue132            elif good_begin <= bad_begin and good_end >= bad_end:133                # case:134                #     [     good     ]135                #         [  bad  ]136                out_good_range.append([good_begin, bad_begin - 1])137                PRINT("cut bad ip case 4:%s - %s" % (ip_utils.ip_num_to_string(bad_begin), ip_utils.ip_num_to_string(bad_end)))138                good_begin = bad_end + 1139                bad_i += 1140                bad_begin, bad_end = bad_range[bad_i]141                continue142            elif good_begin >= bad_begin and good_end <= bad_end:143                # case:144                #          [good]145                #      [    bad    ]146                PRINT("cut bad ip case 5:%s - %s" % (ip_utils.ip_num_to_string(good_begin), ip_utils.ip_num_to_string(good_end)))147                break148            else:149                PRINT("any case? good:%s-%s bad:%s-%s" % (ip_utils.ip_num_to_string(good_begin), ip_utils.ip_num_to_string(good_end),150                    ip_utils.ip_num_to_string(bad_begin), ip_utils.ip_num_to_string(bad_end)))151                assert( False )152    return out_good_range153def download_apic(filename):154    url = 'http://ftp.apnic.net/apnic/stats/apnic/delegated-apnic-latest'155    try:156        data = subprocess.check_output(['wget', url, '-O-'])157    except (OSError, AttributeError):158        print >> sys.stderr, "Fetching data from apnic.net, "\159                             "it might take a few minutes, please wait..."160        data = urllib2.urlopen(url).read()161    with open(filename, "w") as f:162        f.write(data)163    return data164def generage_range_from_apnic(input):165    cnregex = re.compile(r'^apnic\|(?:cn)\|ipv4\|[\d\.]+\|\d+\|\d+\|a\w*$',166                         re.I | re.M )167    cndata = cnregex.findall(input)168    results = []169    for item in cndata:170        unit_items = item.split('|')171        starting_ip = unit_items[3]172        num_ip = int(unit_items[4])173        cidr = 32 - int(math.log(num_ip, 2))174        results.append("%s/%s" % (starting_ip, cidr))175    return "\n".join(results)176def load_bad_ip_range():177    file_name = "delegated-apnic-latest.txt"178    apnic_file = os.path.join(config.DATA_PATH, file_name)179    if not os.path.isfile(apnic_file):180        download_apic(apnic_file)181    with open(apnic_file, "r") as inf:182        apnic_lines = inf.read()183    bad_ip_range_lines = generage_range_from_apnic(apnic_lines)184    sepcial_bad_ip_range_lines  = """185    130.211.0.0/16          #Empty ip range, no route to it.186    255.255.255.255/32      #for algorithm187    """188    return bad_ip_range_lines + sepcial_bad_ip_range_lines189def generate_ip_range():190    # load input good ip range191    file_name = "ip_range.txt"192    input_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_name)193    with open(input_file, "r") as inf:194        input_good_range_lines = inf.read()195    ip_range_list = parse_range_string(input_good_range_lines)196    ip_range_list = merge_range(ip_range_list)197    PRINT("Good ip range:\n")198    print_range_list(ip_range_list)199    if False:200        input_bad_ip_range_lines = load_bad_ip_range()201        bad_range_list = parse_range_string(input_bad_ip_range_lines)202        bad_range_list = merge_range(bad_range_list)203        PRINT("Bad ip range:\n")204        print_range_list(ip_range_list)205        ip_range_list = filter_ip_range(ip_range_list, bad_range_list)206        PRINT("Output ip range:\n")207        print_range_list(ip_range_list)208    # write out209    output_file = os.path.join(config.DATA_PATH, file_name)210    fd = open(output_file, "w")211    for ip_range in ip_range_list:212        begin = ip_range[0]213        end = ip_range[1]214        #print ip_utils.ip_num_to_string(begin), ip_utils.ip_num_to_string(end)215        fd.write(ip_utils.ip_num_to_string(begin)+ "-" + ip_utils.ip_num_to_string(end)+"\n")216    fd.close()217def test_load():218    print("Begin test load ip_range.txt\n")219    file_name = os.path.join(config.DATA_PATH, "ip_range.txt")220    fd = open(file_name, "r")221    if not fd:222        print "open ip_range.txt fail."223        exit()224    amount = 0225    for line in fd.readlines():226        if len(line) == 0 or line[0] == '#':227            continue228        begin, end = ip_utils.split_ip(line)229        nbegin = ip_utils.ip_string_to_num(begin)230        nend = ip_utils.ip_string_to_num(end)231        num = nend - nbegin232        amount += num233        print ip_utils.ip_num_to_string(nbegin), ip_utils.ip_num_to_string(nend), num234    fd.close()235    print "amount ip:", amount236def main():237    generate_ip_range()238    test_load()239if __name__ == "__main__":...

Full Screen

Full Screen

google_ip_range.py

Source:google_ip_range.py Github

copy

Full Screen

...46        for line in lines:47            if len(line) == 0 or line[0] == '#':48                continue49            try:50                begin, end = ip_utils.split_ip(line)51                nbegin = ip_utils.ip_string_to_num(begin)52                nend = ip_utils.ip_string_to_num(end)53                if not nbegin or not nend or nend < nbegin:54                    xlog.warn("load ip range:%s fail", line)55                    continue56            except Exception as e:57                xlog.exception("load ip range:%s fail:%r", line, e)58                continue59            self.ip_range_map[self.candidate_amount_ip] = [nbegin, nend]60            self.ip_range_list.append( [nbegin, nend] )61            self.ip_range_index.append(self.candidate_amount_ip)62            num = nend - nbegin63            self.candidate_amount_ip += num64            # print ip_utils.ip_num_to_string(nbegin), ip_utils.ip_num_to_string(nend), num65        self.ip_range_index.sort()66        #print "amount ip num:", self.candidate_amount_ip67    def show_ip_range(self):68        for id in self.ip_range_map:69            print "[",id,"]:", self.ip_range_map[id]70    def get_real_random_ip(self):71        while True:72            ip_int = random.randint(0, 4294967294)73            add_last_byte = ip_int % 25574            if add_last_byte == 0 or add_last_byte == 255:75                continue76            return ip_int77    def get_ip(self):78        #return self.get_real_random_ip()79        while True:80            index = random.randint(0, len(self.ip_range_list) - 1)81            ip_range = self.ip_range_list[index]82            #xlog.debug("random.randint %d - %d", ip_range[0], ip_range[1])83            if ip_range[1] == ip_range[0]:84                return ip_utils.ip_num_to_string(ip_range[1])85            try:86                id_2 = random.randint(0, ip_range[1] - ip_range[0])87            except Exception as e:88                xlog.exception("random.randint:%r %d - %d, %d", e, ip_range[0], ip_range[1], ip_range[1] - ip_range[0])89                return90            ip = ip_range[0] + id_291            add_last_byte = ip % 25692            if add_last_byte == 0 or add_last_byte == 255:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful