How to use get_downloaded_files_list method in toolium

Best Python code snippet using toolium_python

iucn_navigator.py

Source:iucn_navigator.py Github

copy

Full Screen

...135 for li in lis:136 if self.downloaded >= self.batch_size:137 self.logger.info("reached download batch size {}; exiting".format(self.batch_size))138 break139 files_before = self.get_downloaded_files_list()140 subdivs = li.find_elements_by_tag_name("div")141 tmp = subdivs[0].text.split("\n")142 date = tmp[0].replace("Search on","").replace(" at "," ").strip()143 taxon = tmp[1].replace("Description:","").strip()144 if self.was_downloaded_previously(taxon):145 self.logger.info("skipping {} (previously downloaded)".format(taxon))146 continue147 synonym = self.resolve_synonym(taxon)148 if not synonym:149 label = taxon150 else:151 label = "{}_[{}]".format(synonym,taxon)152 link = subdivs[2].find_element_by_tag_name("a")153 if not self.debug:154 try:155 link.click()156 files_after = self.get_downloaded_files_list()157 files_diff = list(set(files_after) - set(files_before))158 new_name = "redlist-species-data--{}--({}).zip".format(label.lower().replace(" ","_"),date)159 new_file = self.rename_downloaded_file(files_diff[0],new_name)160 self.logger.info("downloaded {} to {}".format(taxon,new_file))161 self.downloads.append({162 "taxon" : taxon,163 "file" : new_file,164 "search_date" : date,165 "download_date" : str(datetime.fromtimestamp(datetime.timestamp(datetime.now())))166 })167 self.downloaded += 1168 except Exception as err:169 self.logger.info("error occurred while downloading {}: {}".format(taxon,err))170 171 else:172 self.logger.info("skipped actual download of {} (debug mode)".format(taxon))173 def get_downloaded_species(self):174 return json.dumps(self.downloads)175 def get_downloaded_files_list(self):176 onlyfiles = [f for f in listdir(self.download_folder) if isfile(join(self.download_folder, f))]177 return onlyfiles178 def rename_downloaded_file(self,old_name,new_name):179 new = os.path.join(self.download_folder,new_name)180 shutil.move(os.path.join(self.download_folder,old_name),new)181 return new182 def was_downloaded_previously(self,name):183 name_match = [item for item in self.previous_downloads if item[0] == name ]184 if name_match:185 return True186 else:187 return False188 def resolve_synonym(self,name):189 name_match = [item for item in self.synonyms if item[0] == name ]...

Full Screen

Full Screen

processor_milk_2.py

Source:processor_milk_2.py Github

copy

Full Screen

...41TOTAL_TIMEOUT = 30042# URL+Agent --> Log ID43log_ids = {}44# Get the SHA 256 file hashes45def get_downloaded_files_list():46 downloads_path = os.path.join(config.MAIN_LOG_PATH, config.DOWNLOADS_DIR)47 downloaded_files = [x for x in os.listdir(downloads_path) if 48 (x != 'raw') and49 not x.startswith('.')] 50 return_str = json.dumps(downloaded_files)51 return return_str52# Parse the se_hunter.log file and get the loaded page's image hash,53# its URL and if there were any files downloaded after interaction 54# from that file55def get_milking_return_data(log_id):56 log_path = os.path.join(config.MAIN_LOG_PATH, config.SEHUNTER_LOGS_DIR, "%s.log" % (log_id,))57 with open(log_path) as f:58 screenshot_path = None59 home_url = None60 downloaded_file = False61 for line in f:62 if "The screenshot of loaded home page" in line:63 screenshot_path = line.strip().rsplit(' ', 1)[1]64 if "Home URL: " in line:65 home_url = line.strip().rsplit(' ', 1)[1]66 if "Downloaded a file: " in line:67 downloaded_file = True68 if screenshot_path:69 image = Image.open(screenshot_path)70 row, col = dhash.dhash_row_col(image)71 screenshot_hash = dhash.format_hex(row, col)72 else:73 screenshot_hash = None74 return screenshot_hash, home_url, downloaded_file 75@timeout_decorator.timeout(SESSION_TIMEOUT)76def run_adminer(adminer):77 try:78 print "Log ID for this session:", adminer.log_id79 adminer.run(num_actions=2)80 adminer.bi.log_downloads()81 adminer.cleanup()82 except Exception as e:83 print e84 print "Exception in run_adminer. Here's the traceback:"85 traceback.print_exc()86 if adminer is not None:87 # Sometimes, the browser shuts down due to an error. But, there could be88 # downloaded files in the raw dir89 adminer.bi.log_downloads()90 adminer.bi.devtools_client.close_browser()91 raise e92def worker(url, agent, vmhost):93 # Only useful when testing outside of Docker; Can be removed later94 utils.kill_old_processes('chrome', age=config.OBSOLETE_PROCESS_AGE)95 utils.delete_old_files(config.CHROME_BINARY_PATH, 'jsgraph.log', config.OBSOLETE_PROCESS_AGE)96 print "%s started. Domain: %s; Agent: %s" % (os.getpid(), url, agent)97 tabs_opened = 0 98 log_id = "_".join((vmhost, socket.gethostname(), us_timestamp_str()))99 error = False100 adminer = None101 try:102 adminer = ad_miner.AdMiner(start_url=url, log_id=log_id, agent_name=agent)103 tabs_opened = run_adminer(adminer)104 except Exception as e:105 error = True106 print "Got exception: for %s" % (os.getpid())107 print e108 #import ipdb; ipdb.set_trace()109 if adminer is not None:110 utils.kill_processes_by_cmdline('chrome', adminer.log_id) # Kill relevant chrome and chromedriver processes111 adminer.cleanup()112 print "Killed browser for a broken session: %s" % (log_id,)113 image_hash, loaded_url, downloaded_files = get_milking_return_data(log_id)114 file_hashes = get_downloaded_files_list()115 loaded_sld = ""116 if loaded_url:117 ext = tldextract.extract(loaded_url)118 loaded_sld = '.'.join(part for part in ext if part)119 # Sending logs:120 ship_logs(log_id, milking=True)121 return {"log_id": log_id, 122 "error": error,123 "image_hash": image_hash,124 "loaded_url": loaded_url,125 "loaded_sld": loaded_sld,126 "downloaded_files": downloaded_files,127 "file_hashes": file_hashes}128# When calling without docker, you can run this directly...

Full Screen

Full Screen

daily_run.py

Source:daily_run.py Github

copy

Full Screen

...18 for link in soup.table.find_all('a'):19 res.append(str(base_url + link['href']))20 return res21# A helper function to retrieve a list of ZIP file names that are already downloaded22def get_downloaded_files_list(bucket_name, key_name):23 '''24 The function will fetch a JSON file from AWS S3, which stored the file names that it has already25 download historically. It assume the JSON file contains a pair with 'downloaded_files' as the key26 and a list of file names as the value.27 '''28 # Extract the file name from the key name29 pos = key_name.rfind('/')30 if pos == -1:31 file_name = key_name32 else:33 file_name = key_name[pos + 1:]34 download_path = '/tmp/' + file_name35 s3 = boto3.resource('s3')36 bucket = s3.Bucket(bucket_name)37 bucket.download_file(key_name, download_path)38 properties = json.load(open(download_path, 'r'))39 return properties['downloaded_files']40# A helper function to update the downloaded file list in S341def update_downloaded_files_list(bucket_name, key_name, old_list, new_file_url):42 '''43 The function will add the newly downloaded file's URL into the list in the JSON object and update44 the new JSON object to S3.45 '''46 # Add the new file URL to the list47 old_list.append(new_file_url)48 # Make it into a dictionary49 pairs = dict()50 pairs['downloaded_files'] = old_list51 # Create a temporary JSON file for uploading52 json_str = json.dumps(pairs)53 print json_str54 in_mem_obj = StringIO(json_str)55 # Upload the JSON file56 s3 = boto3.resource('s3')57 bucket = s3.Bucket(bucket_name)58 bucket.upload_fileobj(in_mem_obj, key_name)59 in_mem_obj.close()60# A helper function to download a ZIP file from a specified URL61def download_data(zip_url, bucket_name):62 '''63 The ZIP file URL is in the following format:64 'https://aact.ctti-clinicaltrials.org/static/exported_files/20180201_pipe-delimited-export.zip'65 The function will store each data file into a coresponding folder with the same name and the data66 file will be renamed as the date specified in the ZIP file name, under the specified bucket.67 e.g. Every file extracted from the ZIP file in the above URL will be store as 'tablename/20180201'68 in the specified bucket69 '''70 # Extract the date from the URL71 start_pos = zip_url.rfind('/')72 end_pos = zip_url.find('_', start_pos + 1)73 timestamp = ''74 if start_pos == -1 or end_pos == -1:75 # Failed to locate the date in the URL, use the date of today as a default value76 timestamp = date.today().strftime('%Y%m%d')77 else:78 timestamp = zip_url[start_pos + 1 : end_pos]79 # Download an 50 MB chunk each time80 download_chunk_size = 50 * 1024 * 102481 # Streaming download, a chunk at a timerr82 zip_resp = requests.get(zip_url, stream=True)83 # Store the ZIP file in memory84 in_mem_file = StringIO()85 for chunk in zip_resp.iter_content(chunk_size=download_chunk_size):86 in_mem_file.write(chunk)87 # Make the download content into a ZIP file88 in_mem_zip = ZipFile(in_mem_file)89 # Upload to S390 s3 = boto3.resource('s3')91 my_bucket = s3.Bucket(bucket_name)92 # Go through every file in the ZIP file93 for each in in_mem_zip.namelist():94 # Remove possible file extension name to use it as directory name95 test_pos = each.find('.txt')96 if test_pos != -1:97 dir_name = each[:test_pos]98 else:99 dir_name = each100 # Construct an in-memory file object101 in_mem_unzip_file = StringIO(in_mem_zip.read(each))102 # Put each file in its folder and name it as the timestamp103 my_bucket.upload_fileobj(in_mem_unzip_file, dir_name + '/' + timestamp)104 # Release the StringIO to save some space105 in_mem_unzip_file.close()106# The main routine of the script107def main(json_input, context):108 # Names of the S3 buckets109 data_bucket_name = 'tiberclinicaltrials'110 config_bucket_name = 'tiberclinicaltrialsmetadata'111 # The key of the configuration file on S3112 key_name = 'persistent_states/downloaded_files.json'113 # Fetch all the ZIP file URLs on the page114 urls = set(parse_page())115 # Fetch all the downloaded file names from S3116 downloaded_files = set(get_downloaded_files_list(config_bucket_name, key_name))117 # Find out all the files that haven't been download118 new_files = urls - downloaded_files119 # Pick an random file in this set120 zip_url = new_files.pop()121 # Download the file, extract it and upload every table to S3122 download_data(zip_url, data_bucket_name)123 # Update the JSON file on S3...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run toolium automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful