Best Python code snippet using toolium_python
iucn_navigator.py
Source:iucn_navigator.py  
...135        for li in lis:136            if self.downloaded >= self.batch_size:137                self.logger.info("reached download batch size {}; exiting".format(self.batch_size))138                break139            files_before = self.get_downloaded_files_list()140            subdivs = li.find_elements_by_tag_name("div")141            tmp = subdivs[0].text.split("\n")142            date = tmp[0].replace("Search on","").replace(" at "," ").strip()143            taxon = tmp[1].replace("Description:","").strip()144            if self.was_downloaded_previously(taxon):145                self.logger.info("skipping {} (previously downloaded)".format(taxon))146                continue147            synonym = self.resolve_synonym(taxon)148            if not synonym:149                label = taxon150            else:151                label = "{}_[{}]".format(synonym,taxon)152            link = subdivs[2].find_element_by_tag_name("a")153            if not self.debug:154                try:155                    link.click()156                    files_after = self.get_downloaded_files_list()157                    files_diff = list(set(files_after) - set(files_before))158                    new_name = "redlist-species-data--{}--({}).zip".format(label.lower().replace(" ","_"),date)159                    new_file = self.rename_downloaded_file(files_diff[0],new_name)160                    self.logger.info("downloaded {} to {}".format(taxon,new_file))161                    self.downloads.append({162                        "taxon" : taxon,163                        "file" : new_file,164                        "search_date" : date,165                        "download_date" : str(datetime.fromtimestamp(datetime.timestamp(datetime.now())))166                        })167                    self.downloaded += 1168                except Exception as err:169                    self.logger.info("error occurred while downloading {}: {}".format(taxon,err))170                    171            else:172                self.logger.info("skipped actual download of {} (debug mode)".format(taxon))173    def get_downloaded_species(self):174        return json.dumps(self.downloads)175    def get_downloaded_files_list(self):176        onlyfiles = [f for f in listdir(self.download_folder) if isfile(join(self.download_folder, f))]177        return onlyfiles178    def rename_downloaded_file(self,old_name,new_name):179        new = os.path.join(self.download_folder,new_name)180        shutil.move(os.path.join(self.download_folder,old_name),new)181        return new182    def was_downloaded_previously(self,name):183        name_match = [item for item in self.previous_downloads if item[0] == name ]184        if name_match:185            return True186        else:187            return False188    def resolve_synonym(self,name):189        name_match = [item for item in self.synonyms if item[0] == name ]...processor_milk_2.py
Source:processor_milk_2.py  
...41TOTAL_TIMEOUT = 30042# URL+Agent --> Log ID43log_ids = {}44# Get the SHA 256 file hashes45def get_downloaded_files_list():46    downloads_path = os.path.join(config.MAIN_LOG_PATH, config.DOWNLOADS_DIR)47    downloaded_files = [x for x in os.listdir(downloads_path) if 48                                                    (x != 'raw') and49                                                    not x.startswith('.')] 50    return_str = json.dumps(downloaded_files)51    return return_str52# Parse the se_hunter.log file and get the loaded page's image hash,53# its URL and if there were any files downloaded after interaction 54# from that file55def get_milking_return_data(log_id):56    log_path = os.path.join(config.MAIN_LOG_PATH, config.SEHUNTER_LOGS_DIR, "%s.log" % (log_id,))57    with open(log_path) as f:58        screenshot_path = None59        home_url = None60        downloaded_file = False61        for line in f:62            if "The screenshot of loaded home page" in line:63                screenshot_path = line.strip().rsplit(' ', 1)[1]64            if "Home URL: " in line:65                home_url = line.strip().rsplit(' ', 1)[1]66            if "Downloaded a file: " in line:67                downloaded_file = True68    if screenshot_path:69        image = Image.open(screenshot_path)70        row, col = dhash.dhash_row_col(image)71        screenshot_hash = dhash.format_hex(row, col)72    else:73        screenshot_hash = None74    return screenshot_hash, home_url, downloaded_file 75@timeout_decorator.timeout(SESSION_TIMEOUT)76def run_adminer(adminer):77    try:78        print "Log ID for this session:", adminer.log_id79        adminer.run(num_actions=2)80        adminer.bi.log_downloads()81        adminer.cleanup()82    except Exception as e:83        print e84        print "Exception in run_adminer. Here's the traceback:"85        traceback.print_exc()86        if adminer is not None:87            # Sometimes, the browser shuts down due to an error. But, there could be88            # downloaded files in the raw dir89            adminer.bi.log_downloads()90            adminer.bi.devtools_client.close_browser()91        raise e92def worker(url, agent, vmhost):93    # Only useful when testing outside of Docker; Can be removed later94    utils.kill_old_processes('chrome', age=config.OBSOLETE_PROCESS_AGE)95    utils.delete_old_files(config.CHROME_BINARY_PATH, 'jsgraph.log', config.OBSOLETE_PROCESS_AGE)96    print "%s started. Domain: %s; Agent: %s" % (os.getpid(), url, agent)97    tabs_opened = 0 98    log_id = "_".join((vmhost, socket.gethostname(), us_timestamp_str()))99    error = False100    adminer = None101    try:102        adminer = ad_miner.AdMiner(start_url=url, log_id=log_id, agent_name=agent)103        tabs_opened = run_adminer(adminer)104    except Exception as e:105        error = True106        print "Got exception: for %s" % (os.getpid())107        print e108        #import ipdb; ipdb.set_trace()109        if adminer is not None:110            utils.kill_processes_by_cmdline('chrome', adminer.log_id)   # Kill relevant chrome and chromedriver processes111            adminer.cleanup()112        print "Killed browser for a broken session: %s" % (log_id,)113    image_hash, loaded_url, downloaded_files = get_milking_return_data(log_id)114    file_hashes = get_downloaded_files_list()115    loaded_sld = ""116    if loaded_url:117        ext = tldextract.extract(loaded_url)118        loaded_sld = '.'.join(part for part in ext if part)119    # Sending logs:120    ship_logs(log_id, milking=True)121    return {"log_id": log_id, 122            "error": error,123            "image_hash": image_hash,124            "loaded_url": loaded_url,125            "loaded_sld": loaded_sld,126            "downloaded_files": downloaded_files,127            "file_hashes": file_hashes}128# When calling without docker, you can run this directly...daily_run.py
Source:daily_run.py  
...18	for link in soup.table.find_all('a'):19		res.append(str(base_url + link['href']))20	return res21# A helper function to retrieve a list of ZIP file names that are already downloaded22def get_downloaded_files_list(bucket_name, key_name):23	'''24	The function will fetch a JSON file from AWS S3, which stored the file names that it has already25	download historically. It assume the JSON file contains a pair with 'downloaded_files' as the key26	and a list of file names as the value.27	'''28	# Extract the file name from the key name29	pos = key_name.rfind('/')30	if pos == -1:31		file_name = key_name32	else:33		file_name = key_name[pos + 1:]34	download_path = '/tmp/' + file_name35	s3 = boto3.resource('s3')36	bucket = s3.Bucket(bucket_name)37	bucket.download_file(key_name, download_path)38	properties = json.load(open(download_path, 'r'))39	return properties['downloaded_files']40# A helper function to update the downloaded file list in S341def update_downloaded_files_list(bucket_name, key_name, old_list, new_file_url):42	'''43	The function will add the newly downloaded file's URL into the list in the JSON object and update44	the new JSON object to S3.45	'''46	# Add the new file URL to the list47	old_list.append(new_file_url)48	# Make it into a dictionary49	pairs = dict()50	pairs['downloaded_files'] = old_list51	# Create a temporary JSON file for uploading52	json_str = json.dumps(pairs)53	print json_str54	in_mem_obj = StringIO(json_str)55	# Upload the JSON file56	s3 = boto3.resource('s3')57	bucket = s3.Bucket(bucket_name)58	bucket.upload_fileobj(in_mem_obj, key_name)59	in_mem_obj.close()60# A helper function to download a ZIP file from a specified URL61def download_data(zip_url, bucket_name):62	'''63	The ZIP file URL is in the following format:64	'https://aact.ctti-clinicaltrials.org/static/exported_files/20180201_pipe-delimited-export.zip'65	The function will store each data file into a coresponding folder with the same name and the data66	file will be renamed as the date specified in the ZIP file name, under the specified bucket.67	e.g. Every file extracted from the ZIP file in the above URL will be store as 'tablename/20180201'68	in the specified bucket69	'''70	# Extract the date from the URL71	start_pos = zip_url.rfind('/')72	end_pos = zip_url.find('_', start_pos + 1)73	timestamp = ''74	if start_pos == -1 or end_pos == -1:75		# Failed to locate the date in the URL, use the date of today as a default value76		timestamp = date.today().strftime('%Y%m%d')77	else:78		timestamp = zip_url[start_pos + 1 : end_pos]79	# Download an 50 MB chunk each time80	download_chunk_size = 50 * 1024 * 102481	# Streaming download, a chunk at a timerr82	zip_resp = requests.get(zip_url, stream=True)83	# Store the ZIP file in memory84	in_mem_file = StringIO()85	for chunk in zip_resp.iter_content(chunk_size=download_chunk_size):86		in_mem_file.write(chunk)87	# Make the download content into a ZIP file88	in_mem_zip = ZipFile(in_mem_file)89	# Upload to S390	s3 = boto3.resource('s3')91	my_bucket = s3.Bucket(bucket_name)92	# Go through every file in the ZIP file93	for each in in_mem_zip.namelist():94		# Remove possible file extension name to use it as directory name95		test_pos = each.find('.txt')96		if test_pos != -1:97			dir_name = each[:test_pos]98		else:99			dir_name = each100		# Construct an in-memory file object101		in_mem_unzip_file = StringIO(in_mem_zip.read(each))102		# Put each file in its folder and name it as the timestamp103		my_bucket.upload_fileobj(in_mem_unzip_file, dir_name + '/' + timestamp)104		# Release the StringIO to save some space105		in_mem_unzip_file.close()106# The main routine of the script107def main(json_input, context):108	# Names of the S3 buckets109	data_bucket_name = 'tiberclinicaltrials'110	config_bucket_name = 'tiberclinicaltrialsmetadata'111	# The key of the configuration file on S3112	key_name = 'persistent_states/downloaded_files.json'113	# Fetch all the ZIP file URLs on the page114	urls = set(parse_page())115	# Fetch all the downloaded file names from S3116	downloaded_files = set(get_downloaded_files_list(config_bucket_name, key_name))117	# Find out all the files that haven't been download118	new_files = urls - downloaded_files119	# Pick an random file in this set120	zip_url = new_files.pop()121	# Download the file, extract it and upload every table to S3122	download_data(zip_url, data_bucket_name)123	# Update the JSON file on S3...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
