Best Python code snippet using playwright-python
download_apks.py
Source:download_apks.py  
1from bs4 import BeautifulSoup as soup2from selenium import webdriver3from webdriver_manager.chrome import ChromeDriverManager4import webbrowser5from time import sleep6from os import system, path7from requests import get8from warnings import filterwarnings9from traceback import format_exc1011filterwarnings("ignore", category=DeprecationWarning)1213'''14IMPORTANT:15- THE DOWNLOAD LINK WORKS WITH REQUESTS16- if a file has the format apkm, it can be renamed to .apks17- Also maybe call an os.rename on the downloaded file to make it's version more clear (or not to preserve original formatting)18- Do the below url instead where there are 4 pages but there is every bit of vanced software:19    - https://www.apkmirror.com/uploads/?devcategory=team-vanced20- The sleeps are because cloudflare is rate limiting and preventing the program from getting the request data21- Download rate limiting began around getting download #4822- I become unable to get any pages after download #5923'''2425'''26TODO:27- Instead of using the browser to automatically download the apks, click the "here" link in the downloads page to get the28'''2930RATE_LIMIT_PAUSE = .53132def main(url):33    # Setting up the webdriver34    options = webdriver.ChromeOptions()35    options.add_experimental_option("prefs", {"download.default_directory": "apks", "download_restrictions": 3})36    options.add_extension("ublock-origin.crx")37    driver = webdriver.Chrome(options=options)38    apks_list_final = []39    dl_urls_list = []4041    # Getting a list of all download urls42    if not path.exists("apks_list.txt"):43        for i in range(4):44            driver.get(url if i == 0 else url.replace("/uploads", f"/uploads/page/{i+1}"))45            sleep(RATE_LIMIT_PAUSE)46            apks_list = soup(driver.page_source, "html.parser")47            apks_list = apks_list.find_all("div", {"class": "iconsBox"})4849            for elm in apks_list:50                try: apks_list_final.append("https://apkmirror.com" + elm.find_all("a")[1].get("href"))51                except IndexError: pass52    else:53        apks_list_final = open("apks_list.txt").read().split("\n")54    55    system("cls")5657    open("apks_list.txt", "w+").write("\n".join(apks_list_final))5859    try: apks_list_begin = len(open("download_urls_incomplete.txt").readlines())60    except: apks_list_begin = 06162    if not path.exists("download_urls.txt"):63        for i, link in enumerate(apks_list_final[apks_list_begin:]):64            #pause()65            driver.get(link + "#download")66            dl_name = link.split("/")[-2]67            print(f"{i+1+apks_list_begin}/{len(apks_list_final)}) Downloading {dl_name}")68            dl_url = None6970            try:71                dl_url = driver.find_element_by_class_name("downloadButton")#.get_attribute("href") # Clicking the download button72                if dl_url.tag_name != "a": 73                    dl_url = None74                    raise Exception7576                driver.get(dl_url)#driver.execute_script(f"window.open(\"{dl_url}\");")77                print("Found immediate dl button")78                #print("-" * 20)79                continue80            except:81                dl_url = None82                print("No immediate dl button found")83                pass84            85            if not dl_url:86                print("Navigating to dl page")87                accent_color_elms = driver.find_elements_by_class_name("accent_color")88                dl_page_navigated = False89                for url in accent_color_elms:90                    cur_accent_url = url.get_attribute("href")91                    if cur_accent_url is not None and dl_name in cur_accent_url and "apk-download" in cur_accent_url:92                        dl_page_navigated = True93                        url.click()94                        sleep(RATE_LIMIT_PAUSE)95                        break9697                if not dl_page_navigated:98                    print("Could not navigate to download page!", accent_color_elms)99                    print("-" * 20)100                    continue101                #driver.find_elements_by_class_name("accent_color")[-1].click() # Getting to the download page102                print("Clicking dl button")103                dl_url = driver.find_element_by_class_name("downloadButton").get_attribute("href") # Clicking the download button104                driver.get(dl_url)105                sleep(RATE_LIMIT_PAUSE)106107            # Getting the apk download url that works with requests108            print("Getting static download url")109            cur_href = ""110            #e = "\n".join([str(i.get_attribute("href")) for i in driver.find_elements_by_tag_name("a")[::-1]])111            #print(e)112            for url in driver.find_elements_by_tag_name("a")[::-1]:113                cur_href = url.get_attribute("href")114                if "download.php" in str(cur_href):115                    #driver.get(cur_href)#dl_url = url.get_attribute("href")116                    sleep(RATE_LIMIT_PAUSE)                117                    dl_url = cur_href if "wp-content" in cur_href.lower() else driver.current_url118                    print("Static download url found! Using " + "cur_href" if "wp-content" in cur_href.lower() else "driver.current_url")119                    cur_href = None120                    break121            122            #print(dl_url)123            dl_url = driver.current_url if cur_href else dl_url124            dl_urls_list.append(f"{i+1+apks_list_begin}) {dl_name} | {dl_url}")125            print(f"Successfully finished on url '{dl_name}'\nDL URL: '{dl_url}'\nCur URL: '{driver.current_url}'")126            sleep(RATE_LIMIT_PAUSE)127            print("-" * 20)128            #break129130            # Below code uses string manipulation to hopefully get a dl url, that appears to return 404 on some dls so I'll be using selenium to navigate to the download button131            '''link = link + link.split("/")[-2].replace("release", "android-apk-download/download") # needs reworking to work with other vanced stuff132            #print(link, "\n")#, link.split("/"), "\n")133            print(f"Downloading {dl_name} | {link}")134            driver.get(link)135            if "<h1>404</h1>" in str(soup(driver.page_source, "html.parser")):136                print(f"404 at {dl_name} | {link}")137            else: print(f"Success downloading {dl_name} | {link}")'''138        # TODO: Close current webdriver, create new one that allows downloads, download all dl urls139        print("\n".join(dl_urls_list))140        open("download_urls.txt", "w+").write("\n".join(dl_urls_list))141    driver.quit()142143    ''' Downloading all the apks '''144    # Setting up the webdriver145    download_options = webdriver.ChromeOptions()146    download_options.add_experimental_option("prefs", {"download.default_directory": "apks"})147    download_driver = webdriver.Chrome(options=download_options)148    # Dictionary containing the download status of each download 149    download_statuses = {}150    if path.exists("download_statuses.txt"):151        with open("download_statuses.txt") as status_file:152            for status in status_file.read().split("\n"):153                cur_num = status.split(": ")[0]154                cur_status = status.split(": ")[-1]155                download_statuses[cur_num] = cur_status156    print("Downloading apks...")157    with open("download_urls.txt") as dl_urls:158        for i, cur_line in enumerate(dl_urls.read().split("\n")):159            if float(i) == 50:160                print("!!!WARNING, CLOUDFLARE WILL PROBABLY BEGIN RATE LIMITING NOW!!!")161            # Getting the string values needed162            cur_url = cur_line.split("| ")[-1]163            cur_num = cur_line.split(")")[0]164            cur_name = cur_line.split(" ")[1]165            print(f"{cur_num}) DOWNLOADING '{cur_name}'...")166167            # Rerunning the download if the entry for it in the download_statuses dict is blocked due to cloudflare168            try:169                if download_statuses[cur_num] == "Cloudflare":170                    print("Download was previously blocked by Cloudflare, retrying...")171                else:172                    # TODO: Probably check the apks folder to see if the download was actually successful instead of assuming that the dictionary was correct173                    # Edit: I don't think that will be possible as the file names are different to the url or 'name'174                    print("Download was previously successful, continuing to next download...")175                    print("-" * 20)176                    continue177            except:178                print("No entry for this download's status exists, continuing download...")179180            # Getting the download page of the apk181            download_driver.get(cur_url)182183            # Detecting if cloudflare has rate limited the program184            if "cloudflare" in download_driver.page_source.lower() or "rate" in download_driver.page_source.lower() or "limit" in download_driver.page_source.lower() or "request" in download_driver.page_source.lower():185                print("ERROR: CLOUDFLARE IS RATE LIMITING!")186                download_statuses[cur_num] = "Cloudflare"187            else:188                print("DOWNLOAD SUCCESSFUL!")189                download_statuses[cur_num] = "Success"190            print("-" * 20)191192    print("Apks downloaded!")193    pause()194    #download_driver.quit()195    # Saving the dictionary to a file196    with open("download_statuses.txt", "w+") as status_file:197        for status in download_statuses:198            status_file.write(f"{status}: {download_statuses[status]}\n")199200if __name__ == "__main__":201    pause = lambda: system("pause")202    try: main("https://www.apkmirror.com/uploads/?devcategory=team-vanced")203    except KeyboardInterrupt: pass204    except Exception as e: print(e, "\n", format_exc())
...driver_handler.py
Source:driver_handler.py  
...53        if driver_path and path_exists(driver_path):54            add_to_path(driver_path)55        else:56            from .downloader import download_driver57            download_driver(CONSTANTS.CHROME_DRIVER, overwrite_existing=False)58        driver = webdriver.Chrome(options=options)59        return driver60    @staticmethod61    def _create_chrome_options(api_obj):62        options = ChromeOptions()63        args = set()64        if api_obj.headless:65            args.add("--headless")66        if api_obj.incognito:67            args.add("--incognito")68        if api_obj.browser_options_args:69            args.update(api_obj.browser_options_args)70        for arg in args:71            if arg:72                options.add_argument(arg)73        if api_obj.http_proxy:74            options.add_argument('--proxy-server=%s' %api_obj.http_proxy)75        if api_obj.browser_options_dict:76            for k, v in api_obj.browser_options_dict.items():77                options.set_capability(k, v)78        return options79class IEHandler(BaseDriverHandler):80    @staticmethod81    def create_driver(driver_path, api_obj):82        options = IEHandler._create_IE_options(api_obj)83        if hasattr(api_obj, "remote_url") and is_valid_url(api_obj.remote_url):84            caps = options.to_capabilities()85            driver = IEHandler.create_remote_driver(api_obj.remote_url, caps)86            return driver87        if driver_path and path_exists(driver_path):88            add_to_path(driver_path)89        else:90            from .downloader import download_driver91            download_driver(CONSTANTS.IE_DRIVER, overwrite_existing=False)92        IEHandler._handle_IE_registry_entry()93        driver = webdriver.Ie(options=options)94        return driver95    #takes care of adding necessary registry entries needed in windows (needs admin privileges)96    @staticmethod97    def _handle_IE_registry_entry():98        if "window" not in os_name().lower():99            return100        try:101            path32 = r"SOFTWARE\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BFCACHE"  102            path64 = r"SOFTWARE\Wow6432Node\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BFCACHE"103            path = path32 if os_bits() == 32 else path64104            set_winreg_fromlocalmachine(path, "iexplore.exe", 0, overwrite=True)105        except Exception as e:106            get_logger().warning(str(e))107    @staticmethod108    def _create_IE_options(api_obj):109        options = IEOptions()110        args = set()111        #not recommended, ensure correct security settings in IE112        args.add(IEOptions.IGNORE_PROTECTED_MODE_SETTINGS)113        args.add(IEOptions.ENSURE_CLEAN_SESSION)114        args.add(IEOptions.IGNORE_ZOOM_LEVEL)115        args.add(IEOptions.REQUIRE_WINDOW_FOCUS)116        args.add(IEOptions.PERSISTENT_HOVER)117        if api_obj.browser_options_args:118            args.update(api_obj.browser_options_args)119        for arg in args:120            if arg:121                options.add_argument(arg)122        if api_obj.http_proxy:123            proxy = { 'proxyType': "manual", 'httpProxy': str(api_obj.http_proxy)}124            options.set_capability("proxy", proxy)125        options.set_capability(IEOptions.NATIVE_EVENTS, False)126        if api_obj.browser_options_dict:127            for k, v in api_obj.browser_options_dict.items():128                options.set_capability(k, v)129        return options130class FirefoxHandler(BaseDriverHandler):131    @staticmethod132    def create_driver(driver_path, api_obj):133        options = FirefoxHandler._create_firefox_options(api_obj)134        if hasattr(api_obj, "remote_url") and is_valid_url(api_obj.remote_url):135            caps = DesiredCapabilities.FIREFOX.copy()136            if options:137                caps.update(options.to_capabilities())138            driver = FirefoxHandler.create_remote_driver(api_obj.remote_url, caps)139            return driver140        if driver_path and path_exists(driver_path):141            add_to_path(driver_path)142        else:143            from .downloader import download_driver144            download_driver(CONSTANTS.FIREFOX_DRIVER, overwrite_existing=False)145        log_p = os.path.join(log_path(), "geckodriver.log")146        driver = webdriver.Firefox(options=options, service_log_path=log_p)147        return driver148    @staticmethod149    def _create_firefox_options(api_obj):150        options = FirefoxOptions()151        args = set()152        if api_obj.browser_options_args:153            args.update(api_obj.browser_options_args)154        for arg in args:155            if arg:156                options.add_argument(arg)157        options.headless = api_obj.headless158        if api_obj.firefox_binary_path:...download_client.py
Source:download_client.py  
1#!/usr/bin/env python2import sys3import os4import datetime5import argparse6import blessings7import simplejson8import biokbase.Transform.Client9import biokbase.Transform.script_utils10import biokbase.Transform.drivers11if __name__ == "__main__":12    logger = biokbase.Transform.script_utils.stdoutlogger(__file__)13    parser = argparse.ArgumentParser(description='KBase Download demo and client')14    parser.add_argument('--demo', action="store_true")15    parser.add_argument('--ujs_service_url', nargs='?', help='UserandJobState service for monitoring progress', const="", default="https://kbase.us/services/userandjobstate/")16    parser.add_argument('--workspace_service_url', nargs='?', help='Workspace service for KBase objects', const="", default="https://kbase.us/services/ws/")17    parser.add_argument('--awe_service_url', nargs='?', help='AWE service for additional job monitoring', const="", default="http://140.221.67.3:7080")18    parser.add_argument('--transform_service_url', nargs='?', help='Transform service that handles the data conversion to KBase', const="", default="http://140.221.67.3:7778/")19    parser.add_argument('--external_type', nargs='?', help='the external type of the data', const="", default="")20    parser.add_argument('--kbase_type', nargs='?', help='the kbase object type to create', const="", default="")21    parser.add_argument('--workspace_name', nargs='?', help='name of the workspace where your objects should be created', const="", default="upload_testing")22    parser.add_argument('--object_name', nargs='?', help='name of the workspace object to create', const="", default="")23    parser.add_argument('--download_path', nargs='?', help='path to place downloaded files for validation', const=".", default=".")24    parser.add_argument('--config_file', nargs='?', help='path to config file with parameters', const="", default="")25    parser.add_argument('--create_log', help='create pass/fail log file', action='store_true')26    args = parser.parse_args()27    token = biokbase.Transform.script_utils.get_token()28    inputs = list()29    services = dict()30    if not args.demo:31        if args.config_file:32            f = open(args.config_file, 'r')33            config = simplejson.loads(f.read())34            f.close()35        36            services = config["services"]37            inputs = config["download"]38        else:39            inputs = {"user": 40                {"external_type": args.external_type,41                 "kbase_type": args.kbase_type,42                 "object_name": args.object_name,43                 "workspace_name": args.workspace_name,44                 "downloadPath": args.download_path45                }46            }47            services = {"shock_service_url": args.shock_service_url,48                        "ujs_service_url": args.ujs_service_url,49                        "workspace_service_url": args.workspace_service_url,50                        "awe_service_url": args.awe_service_url,51                        "transform_service_url": args.transform_service_url,52                        "handle_service_url": args.handle_service_url}53        workspace = args.workspace_name    54    else:55        if "kbasetest" not in token and len(args.workspace_name.strip()) == 0:56            print "If you are running the demo as a different user than kbasetest, you need to provide the name of your workspace with --workspace."57            sys.exit(0)58        else:59            if args.workspace_name is not None:60                workspace = args.workspace_name61            else :62                workspace = "upload_testing"63        64        f = open("conf/download/download_demo.cfg")65        config = simplejson.loads(f.read())66        f.close()67        services = config["services"]68        inputs = config["download"]69    70    stamp = datetime.datetime.now().isoformat()71    os.mkdir(stamp)72    download_driver = biokbase.Transform.drivers.TransformClientTerminalDriver(service_urls=services)73    74    term = blessings.Terminal()75    for x in inputs:76        external_type = inputs[x]["external_type"]77        kbase_type = inputs[x]["kbase_type"]78        object_name = inputs[x]["object_name"]79        80        if inputs[x].has_key("workspace_name") and inputs[x]["workspace_name"]:81            ws_name = inputs[x]["workspace_name"]82        else:83            ws_name = workspace84        optional_arguments = None85        if inputs[x].has_key("optional_arguments"):86            optional_arguments = inputs[x]["optional_arguments"]87        print "\n\n"88        print "\t", term.bold_underline_bright_magenta("{0}").format(x.upper())89        print "\t", term.bold("#"*80)90        print "\t", term.bold_white_on_black("Converting {0} => {1}".format(kbase_type,external_type))91        print "\t", term.bold("#"*80)92        conversionDownloadPath = os.path.join(stamp, kbase_type + "_to_" + external_type)93        try:94            os.mkdir(conversionDownloadPath)95        except:96            pass97        98        downloadPath = os.path.join(conversionDownloadPath)99        try:100            print term.bold("\tStep 1: Make KBase download request")101            input_object = dict()102            input_object["external_type"] = external_type103            input_object["kbase_type"] = kbase_type104            input_object["workspace_name"] = ws_name105            input_object["object_name"] = object_name106            if optional_arguments is not None:107                input_object["optional_arguments"] = optional_arguments108            else:109                input_object["optional_arguments"] = {"transform": {}}110            transform_client = biokbase.Transform.Client.Transform(url=services["transform_service_url"], token=token)            111            awe_job_id, ujs_job_id = transform_client.download(input_object)112            113            print term.blue("\tTransform service download requested:")114            print "\t\tConverting from {0} => {1}\n\t\tUsing workspace {2} with object name {3}".format(kbase_type,external_type,workspace,object_name)115            print term.blue("\tTransform service responded with job ids:")116            print "\t\tAWE job id {0}\n\t\tUJS job id {1}".format(awe_job_id, ujs_job_id)117         118            job_exit_status = download_driver.monitor_job(awe_job_id, ujs_job_id)119            if not job_exit_status[0]:                120                download_driver.show_job_debug(awe_job_id, ujs_job_id)121                raise Exception("KBase Download exited with an error")122            123            results = download_driver.get_job_results(ujs_job_id)124            125            print "\t", term.bold("Step 2: Grab data from SHOCK\n")126            download_driver.download_from_shock(results["shockurl"], results["results"][0]["id"], downloadPath)127        128            print term.green("\tShock download of {0} successful.\n\n".format(downloadPath))129        except Exception, e:...chromedriver_update.py
Source:chromedriver_update.py  
...48            if zf == 'chromedriver.exe':49                zfile.extract(zf, os.path.dirname(self._chrome_driver_path))50        zfile.close()51        os.remove('download_driver.zip')52    def _download_driver(self, chrome_ver):53        self._shut_down_current_driver()54        target_chromedriver_ver = requests.get(self._chromedriver_url + 'LATEST_RELEASE_' + str(chrome_ver.split('.')[0])).text55        chromedriver_path = self._chromedriver_url + target_chromedriver_ver + '/'56        if self._platform == 'win32':57            self._download_extract_driver(chromedriver_path + 'chromedriver_win32.zip')58        # python2 returns 'linux2'59        elif self._platform == 'linux':60            self._download_extract_driver(chromedriver_path + 'chromedriver_linux64.zip')61        else:62            self._download_extract_driver(chromedriver_path + 'chromedriver_mac64.zip')63    64    def compare_download(self):65        chrome_ver = self._get_current_chrome_ver66        chromedriver_ver = self._get_current_chromedriver_ver67        if chrome_ver.split('.')[0] != chromedriver_ver.split('.')[0]:68            self._download_driver(chrome_ver)        69        return f'Chrome: {chrome_ver}\nChromeDriver: {self._get_current_chromedriver_ver}'70if __name__ == "__main__":...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
