How to use _update_progress_bar method in avocado

Best Python code snippet using avocado_python

downloader.py

Source:downloader.py Github

copy

Full Screen

...45 desc = desc[:20] + '...'46 kwargs.setdefault('ncols', 90)47 kwargs.setdefault('desc', desc)48 self._tqdm = tqdm.tqdm(**kwargs)49 def _update_progress_bar(self, n):50 if self._tqdm:51 self._tqdm.update(n)52 def _get_file_size(self, file):53 if os.path.exists(file):54 return os.path.getsize(file)55 else:56 return None57 def _parse_headers(self, initial_sizes):58 headers = self.headers_request or {}59 if initial_sizes:60 headers['Range'] = 'bytes=%s-' % initial_sizes61 return headers62 def download(self):63 initial_file_sizes = self._get_file_size(self.file)64 # Parse headers65 headers = self._parse_headers(initial_file_sizes)66 # Initiate request67 resp = Net.requests.get(self.url, headers=headers, stream=True)68 # Grab the file sizes69 file_sizes = float(resp.headers.get('Content-Length'))70 # If "Range" header request is present71 # Content-Length header response is not same as full size72 if initial_file_sizes:73 file_sizes += initial_file_sizes74 real_file_sizes = self._get_file_size(self.real_file)75 if real_file_sizes:76 if file_sizes == real_file_sizes and not self.replace:77 log.info('File exist and replace is False, cancelling download...')78 return79 # Build the progress bar80 self._build_progres_bar(initial_file_sizes, float(file_sizes))81 # Heavily adapted from https://github.com/choldgraf/download/blob/master/download/download.py#L377-L39082 chunk_size = 2 ** 1683 with open(self.file, 'ab' if initial_file_sizes else 'wb') as writer:84 while True:85 t0 = time.time()86 chunk = resp.raw.read(chunk_size)87 dt = time.time() - t088 if dt < 0.005:89 chunk_size *= 290 elif dt > 0.1 and chunk_size > 2 ** 16:91 chunk_size = chunk_size // 292 if not chunk:93 break94 writer.write(chunk)95 self._update_progress_bar(len(chunk))96 97 # Delete original file if replace is True and real file is exist98 if real_file_sizes and self.replace:99 os.remove(self.real_file)100 os.rename(self.file, self.real_file)101 def cleanup(self):102 # Close the progress bar103 if self._tqdm:104 self._tqdm.close()105class StdoutDownloader(BaseDownloader):106 def __init__(self, url) -> None:107 self.url = url108 109 def download(self):110 r = Net.requests.get(self.url, stream=True)111 stdout = open(sys.stdout.fileno(), 'wb')112 for content in r.iter_content(1024):113 stdout.write(content)114 115 def cleanup(self):116 pass117class AsyncFileDownloader(BaseDownloader):118 """FileDownloader for async process using aiohttp with resumeable support"""119 def __init__(self, url, file, progress_bar=True, replace=False, **headers) -> None:120 self.url = url121 self.file = str(file) + '.temp'122 self.real_file = file123 self.progress_bar = progress_bar124 self.replace = replace125 self.headers_request = headers126 if headers.get('Range') is not None and self._get_file_size(self.file):127 raise ValueError('"Range" header is not supported while in resume state')128 self._tqdm = None129 130 def _build_progres_bar(self, initial_size, file_sizes, desc='file_sizes'):131 if self.progress_bar:132 kwargs = {133 'initial': initial_size or 0,134 'total': file_sizes,135 'unit': 'B',136 'unit_scale': True137 }138 # Determine ncols progress bar139 length = len(desc)140 if length < 20:141 kwargs.setdefault('ncols', 80)142 elif length > 20 and length < 50:143 kwargs.setdefault('dynamic_ncols', True)144 # Length desc is more than 40 or 50145 elif length >= 50:146 desc = desc[:20] + '...'147 kwargs.setdefault('ncols', 90)148 kwargs.setdefault('desc', desc)149 self._tqdm = tqdm.tqdm(**kwargs)150 def _update_progress_bar(self, n):151 if self._tqdm:152 self._tqdm.update(n)153 def _get_file_size(self, file):154 if os.path.exists(file):155 return os.path.getsize(file)156 else:157 return None158 def _parse_headers(self, initial_sizes):159 headers = self.headers_request or {}160 if initial_sizes:161 headers['Range'] = 'bytes=%s-' % initial_sizes162 return headers163 async def download(self):164 initial_file_sizes = self._get_file_size(self.file)165 # Parse headers166 headers = self._parse_headers(initial_file_sizes)167 # Initiate request168 resp = await Net.aiohttp.get(self.url, headers=headers)169 # Grab the file sizes170 file_sizes = float(resp.headers.get('Content-Length'))171 # If "Range" header request is present172 # Content-Length header response is not same as full size173 if initial_file_sizes:174 file_sizes += initial_file_sizes175 real_file_sizes = self._get_file_size(self.real_file)176 if real_file_sizes:177 if file_sizes == real_file_sizes and not self.replace:178 log.info('File exist and replace is False, cancelling download...')179 return180 # Build the progress bar181 self._build_progres_bar(initial_file_sizes, float(file_sizes))182 # Heavily adapted from https://github.com/choldgraf/download/blob/master/download/download.py#L377-L390183 chunk_size = 2 ** 16184 with open(self.file, 'ab' if initial_file_sizes else 'wb') as writer:185 while True:186 t0 = time.time()187 chunk = await resp.content.read(chunk_size)188 dt = time.time() - t0189 if dt < 0.005:190 chunk_size *= 2191 elif dt > 0.1 and chunk_size > 2 ** 16:192 chunk_size = chunk_size // 2193 if not chunk:194 break195 writer.write(chunk)196 self._update_progress_bar(len(chunk))197 198 # Delete original file if replace is True and real file is exist199 if real_file_sizes and self.replace:200 os.remove(self.real_file)201 os.rename(self.file, self.real_file)202 async def cleanup(self):203 # Close the progress bar204 if self._tqdm:205 self._tqdm.close()206class AsyncFastFileDownloader(BaseDownloader):207 """FAST FileDownloader with 2 connections simultaneously for async process using aiohttp with resumeable support"""208 def __init__(self, url, file, progress_bar=True, replace=False, **headers) -> None:209 self.url = url210 self.real_file = file211 self.progress_bar = progress_bar212 self.replace = replace213 self.headers_request = headers214 if headers.get('Range') is not None:215 raise ValueError('"Range" header is not supported in fast download')216 self._tqdm = None217 def _build_progres_bar(self, initial_size, file_sizes, desc='file_sizes'):218 if self.progress_bar:219 kwargs = {220 'initial': initial_size or 0,221 'total': file_sizes,222 'unit': 'B',223 'unit_scale': True224 }225 # Determine ncols progress bar226 length = len(desc)227 if length < 20:228 kwargs.setdefault('ncols', 80)229 elif length > 20 and length < 50:230 kwargs.setdefault('dynamic_ncols', True)231 # Length desc is more than 40 or 50232 elif length >= 50:233 desc = desc[:20] + '...'234 kwargs.setdefault('ncols', 90)235 kwargs.setdefault('desc', desc)236 self._tqdm = tqdm.tqdm(**kwargs)237 def _update_progress_bar(self, n):238 if self._tqdm:239 self._tqdm.update(n)240 def _close_progress_bar(self):241 if self._tqdm:242 self._tqdm.close()243 def _get_file_size(self, file):244 if os.path.exists(file):245 return os.path.getsize(file)246 else:247 return None248 def _parse_headers(self, initial_sizes, end_sizes):249 headers = self.headers_request or {}250 headers['Range'] = 'bytes=%s-%s' % (int(initial_sizes), int(end_sizes))251 return headers252 def _get_temp_file(self, part):253 return str(self.real_file) + '.temp.' + str(part)254 async def _prepare_download(self, part, start_size, end_size):255 file = self._get_temp_file(part)256 initial_file_sizes = self._get_file_size(file) or 0257 pure_temp_file_sizes = initial_file_sizes258 exist = True if initial_file_sizes else False259 # If temp part file exist260 # addition it with start_size261 initial_file_sizes += start_size262 # Parse headers263 headers = self._parse_headers(initial_file_sizes, end_size)264 # initiate request265 resp = await Net.aiohttp.get(self.url, headers=headers)266 return pure_temp_file_sizes, resp, file, exist267 async def _download(self, file, resp, exist):268 # Heavily adapted from https://github.com/choldgraf/download/blob/master/download/download.py#L377-L390269 chunk_size = 2 ** 16270 with open(file, 'ab' if exist else 'wb') as writer:271 while True:272 t0 = time.time()273 chunk = await resp.content.read(chunk_size)274 dt = time.time() - t0275 if dt < 0.005:276 chunk_size *= 2277 elif dt > 0.1 and chunk_size > 2 ** 16:278 chunk_size = chunk_size // 2279 if not chunk:280 break281 writer.write(chunk)282 self._update_progress_bar(len(chunk))283 def _get_parts_size(self, length: int):284 divided = length / 2285 if not divided.is_integer():286 parts_size = [0, divided - 0.5, divided + 0.5, length]287 else:288 parts_size = [0, divided - 1, divided, length]289 return parts_size290 def _merge_files(self, parts, file_sizes):291 self._close_progress_bar()292 with open(self.real_file, 'wb') as writer:293 294 self._build_progres_bar(0, file_sizes, 'merging_files')295 for part in parts:296 chunks_size = 2 ** 16297 file = self._get_temp_file(part)298 with open(file, 'rb') as read:299 while True:300 chunks = read.read(chunks_size)301 if not chunks:302 break303 writer.write(chunks)304 self._update_progress_bar(len(chunks))305 306 self._close_progress_bar()307 async def download(self):308 # Grab the file sizes309 resp = await Net.aiohttp.get(self.url)310 file_sizes = float(resp.headers.get('Content-Length'))311 resp.close()312 # TODO: Add explanation below this.313 parts_size = self._get_parts_size(file_sizes)314 part1_kwargs = {315 'part': 1,316 'start_size': parts_size[0],317 'end_size': parts_size[1]318 }...

Full Screen

Full Screen

copyprogress.py

Source:copyprogress.py Github

copy

Full Screen

...19 if not self.output.isatty():20 return21 while not self._is_stopped:22 dst_size = os.path.getsize(self.destination)23 self._update_progress_bar(dst_size, self.source_size)24 time.sleep(1)25 # Erase line with VT100 control code26 self.output.write('\r' + ERASE_LINE)27 self.output.flush()28 def finish(self):29 self._is_stopped = True30 if self.is_alive():31 self.join()32 def _update_progress_bar(self, cur, total):33 percent = int((cur * 100) / total)34 text = '\r' + self.pattern.format(percent)35 self.output.write('\r' + text)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful