How to use _archive_results method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...181 log.exception('Error occured while executing job!')182 success = False183 finally:184 job_log_file.close()185 self._archive_results(results_dir_path)186 return success187 def _extract_payload(self, target_dir_path: str):188'Extracting job payload')189 original_cwd = os.getcwd()190 os.chdir(target_dir_path)191 try:192 with ZipFile(self.job.payload, 'r') as zip_obj:193 zip_obj.extractall()194'Package directory listing:')195 for f in os.listdir('.'):196'- %s', f)197'Extracting build package')198 if platform.system() == 'Windows':199 release_zip = glob.glob('xemu-win-*.zip')[0]200 with ZipFile(release_zip, 'r') as zip_obj:201 zip_obj.extractall()202 os.unlink(release_zip)203 elif platform.system() == 'Linux':204['tar', 'xf', glob.glob(f'xemu-*.tgz')[0]], check=True)205 else:206 assert False, 'Unsupported agent platform'207'Package directory listing:')208 for f in os.listdir('.'):209'- %s', f)210 finally:211 os.chdir(original_cwd)212 def _post_job_status_update(self):213'Posting job status update')214 state_dict = self.job.get_state_update_dict()215 state_file = io.BytesIO(json.dumps(state_dict).encode('utf-8'))216 files = [('state', ('state', state_file, 'application/json'))]217 if self._job_results_archive_path:218 files += [('results', ('results.tgz', open(self._job_results_archive_path, 'rb'), 'application/gzip'))]219 r = + '/' +, files=files, headers=self._agent_headers, verify=self._verify_cert)220 def _archive_results(self, results_dir_path: str):221 archive = tempfile.NamedTemporaryFile(prefix='xemu-results-', suffix='.tgz', delete=False)222 self._job_results_archive_path = archive.name223 archive.close()224 try:225'Generating results archive')226 with, "w:gz") as tar:227 tar.add(results_dir_path, arcname=os.path.basename(results_dir_path))228 except:229 log.exception('Failed to create results archive')230 os.unlink(self._job_results_archive_path)231 self._job_results_archive_path = None232 raise233class ContainerTestingAgent(Agent):234 """235 Agent that receives jobs and executes in test container.236 """237 @staticmethod238 def copy_from_container(c, src: str, dst: str, **kwargs):239['docker', 'cp', f'{}:{src}', dst], check=True, **kwargs)240 @staticmethod241 def copy_to_container(c, src: str, dst: str, **kwargs):242['docker', 'cp', src, f'{}:{dst}'], check=True, **kwargs)243 def _execute_job(self) -> bool:244 """245 Executes current job in test container.246 """247 assert docker is not None, "Docker package not installed"248 d = docker.from_env()249'Pulling test container')250 try:251 d.images.pull(TEST_CONTAINER_IMAGE_NAME, 'master')252 except:253 log.exception('Failed to pull container')254 raise255 with tempfile.TemporaryDirectory(prefix='xemu-job-') as temp_path:256 success = True257 inputs_dir_path = os.path.join(temp_path, 'inputs')258 os.makedirs(inputs_dir_path)259 self._extract_payload(inputs_dir_path)260 shutil.copyfile(glob.glob(f'{inputs_dir_path}/xemu/*.deb')[0],261 os.path.join(inputs_dir_path, 'xemu.deb'))262 results_dir_path = os.path.join(temp_path, 'results')263 os.makedirs(results_dir_path)264 try:265'Creating container')266 c = d.containers.create(TEST_CONTAINER_IMAGE_NAME, detach=True, auto_remove=False, network_mode='none', mem_limit=1280*1024*1024)267 self.copy_to_container(c, self._private_dir_path, '/work')268 self.copy_to_container(c, inputs_dir_path, '/work')269 self.copy_to_container(c, results_dir_path, '/work')270 c.start()271 except:272 log.exception('Failed to launch container')273 raise274'Container started. Waiting for container to exit...')275 now = time.time()276 start_time = now277 last_status_update_time = now278 while True:279 c.reload()280 now = time.time()281 if c.status != 'running':282 exit_code = c.attrs['State']['ExitCode']283'Container exit code: %d', exit_code)284 if exit_code != 0:285 success = False286 break287 if (now - start_time) > JOB_MAX_RUNTIME_SECONDS:288'Tester exceeded maximum time. Terminating.')289 c.kill()290 success = False291 break292 if (now - last_status_update_time) > JOB_STATUS_UPDATE_INTERVAL_SECONDS:293 self._post_job_status_update()294 last_status_update_time = now295 time.sleep(1)296 # Pack results297 try:298 self.copy_from_container(c, '/work/results', os.path.dirname(results_dir_path))299'Saving container logs')300 with open(os.path.join(results_dir_path, 'log.txt'), 'wb') as f:301 f.write(c.logs(timestamps=True))302 except:303 log.exception('Failed to save logs')304 success = False305'Removing container')306 c.remove()307 self._archive_results(results_dir_path)...

Full Screen

Full Screen Github


Full Screen

...18class SiteAgentTask(object):19 """20 SiteAgentTask subclasses BaseAgentTask in monitor_db.21 """22 def _archive_results(self, queue_entries):23 """24 Set the status of queue_entries to ARCHIVING.25 This method sets the status of the queue_entries to ARCHIVING26 if the enable_archiving flag is true in global_config.ini.27 Otherwise, it bypasses the archiving step and sets the queue entries28 to the final status of current step.29 """30 enable_archiving = global_config.global_config.get_config_value(31 scheduler_config.CONFIG_SECTION, 'enable_archiving', type=bool)32 # Set the status of the queue entries to archiving or self final status33 if enable_archiving:34 status = models.HostQueueEntry.Status.ARCHIVING35 else:36 status = self._final_status()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?