How to use on_pending method in autotest

Best Python code snippet using autotest_python

prejob_task.py

Source:prejob_task.py Github

copy

Full Screen

...167 def epilog(self):168 super(VerifyTask, self).epilog()169 if self.success:170 if self._should_pending():171 self.queue_entry.on_pending()172 else:173 self.host.set_status(models.Host.Status.READY)174class CleanupTask(PreJobTask):175 # note this can also run post-job, but when it does, it's running standalone176 # against the host (not related to the job), so it's not considered a177 # PostJobTask178 TASK_TYPE = models.SpecialTask.Task.CLEANUP179 def __init__(self, task, recover_run_monitor=None):180 args = ['--cleanup']181 if task.queue_entry:182 args.extend(self._generate_autoserv_label_args(task))183 super(CleanupTask, self).__init__(task, args)184 self._set_ids(host=self.host, queue_entries=[self.queue_entry])185 def prolog(self):186 super(CleanupTask, self).prolog()187 logging.info("starting cleanup task for host: %s", self.host.hostname)188 self.host.set_status(models.Host.Status.CLEANING)189 if self.queue_entry:190 self.queue_entry.set_status(models.HostQueueEntry.Status.CLEANING)191 def _finish_epilog(self):192 if not self.queue_entry or not self.success:193 return194 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY195 should_run_verify = (196 self.queue_entry.job.run_verify197 and self.host.protection != do_not_verify_protection)198 if should_run_verify:199 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)200 models.SpecialTask.objects.create(201 host=models.Host.objects.get(id=self.host.id),202 queue_entry=entry,203 task=models.SpecialTask.Task.VERIFY)204 else:205 if self._should_pending():206 self.queue_entry.on_pending()207 def epilog(self):208 super(CleanupTask, self).epilog()209 if self.success:210 self.host.update_field('dirty', 0)211 self.host.set_status(models.Host.Status.READY)212 self._finish_epilog()213class ResetTask(PreJobTask):214 """Task to reset a DUT, including cleanup and verify."""215 # note this can also run post-job, but when it does, it's running standalone216 # against the host (not related to the job), so it's not considered a217 # PostJobTask218 TASK_TYPE = models.SpecialTask.Task.RESET219 def __init__(self, task, recover_run_monitor=None):220 args = ['--reset']221 if task.queue_entry:222 args.extend(self._generate_autoserv_label_args(task))223 super(ResetTask, self).__init__(task, args)224 self._set_ids(host=self.host, queue_entries=[self.queue_entry])225 def prolog(self):226 super(ResetTask, self).prolog()227 logging.info('starting reset task for host: %s',228 self.host.hostname)229 self.host.set_status(models.Host.Status.RESETTING)230 if self.queue_entry:231 self.queue_entry.set_status(models.HostQueueEntry.Status.RESETTING)232 # Delete any queued cleanups for this host.233 self.remove_special_tasks(models.SpecialTask.Task.CLEANUP,234 keep_last_one=False)235 # Delete any queued reverifies for this host.236 self.remove_special_tasks(models.SpecialTask.Task.VERIFY,237 keep_last_one=False)238 # Only one reset is needed.239 self.remove_special_tasks(models.SpecialTask.Task.RESET,240 keep_last_one=True)241 def epilog(self):242 super(ResetTask, self).epilog()243 if self.success:244 self.host.update_field('dirty', 0)245 if self._should_pending():246 self.queue_entry.on_pending()247 else:248 self.host.set_status(models.Host.Status.READY)249class ProvisionTask(PreJobTask):250 TASK_TYPE = models.SpecialTask.Task.PROVISION251 def __init__(self, task):252 # Provisioning requires that we be associated with a job/queue entry253 assert task.queue_entry, "No HQE associated with provision task!"254 # task.queue_entry is an afe model HostQueueEntry object.255 # self.queue_entry is a scheduler models HostQueueEntry object, but256 # it gets constructed and assigned in __init__, so it's not available257 # yet. Therefore, we're stuck pulling labels off of the afe model258 # so that we can pass the --provision args into the __init__ call.259 labels = {x.name for x in task.queue_entry.job.labels}260 _, provisionable = provision.filter_labels(labels)261 extra_command_args = ['--provision',262 '--job-labels', ','.join(provisionable)]263 super(ProvisionTask, self).__init__(task, extra_command_args)264 self._set_ids(host=self.host, queue_entries=[self.queue_entry])265 def _command_line(self):266 # If we give queue_entry to _autoserv_command_line, then it will append267 # -c for this invocation if the queue_entry is a client side test. We268 # don't want that, as it messes with provisioning, so we just drop it269 # from the arguments here.270 # Note that we also don't verify job_repo_url as provisioining tasks are271 # required to stage whatever content we need, and the job itself will272 # force autotest to be staged if it isn't already.273 return autoserv_utils._autoserv_command_line(self.host.hostname,274 self._extra_command_args,275 in_lab=True)276 def prolog(self):277 super(ProvisionTask, self).prolog()278 # add check for previous provision task and abort if exist.279 logging.info("starting provision task for host: %s", self.host.hostname)280 self.queue_entry.set_status(281 models.HostQueueEntry.Status.PROVISIONING)282 self.host.set_status(models.Host.Status.PROVISIONING)283 def epilog(self):284 super(ProvisionTask, self).epilog()285 # If we were not successful in provisioning the machine286 # leave the DUT in whatever status was set in the PreJobTask's287 # epilog. If this task was successful the host status will get288 # set appropriately as a fallout of the hqe's on_pending. If289 # we don't call on_pending, it can only be because:290 # 1. This task was not successful:291 # a. Another repair is queued: this repair job will set the host292 # status, and it will remain in 'Provisioning' till then.293 # b. We have hit the max_repair_limit: in which case the host294 # status is set to 'RepairFailed' in the epilog of PreJobTask.295 # 2. The task was successful, but there are other special tasks:296 # Those special tasks will set the host status appropriately.297 if self._should_pending():298 self.queue_entry.on_pending()299class RepairTask(agent_task.SpecialAgentTask):300 TASK_TYPE = models.SpecialTask.Task.REPAIR301 def __init__(self, task):302 """\303 queue_entry: queue entry to mark failed if this repair fails.304 """305 protection = host_protections.Protection.get_string(306 task.host.protection)307 # normalize the protection name308 protection = host_protections.Protection.get_attr_name(protection)309 args = ['-R', '--host-protection', protection]310 if task.queue_entry:311 args.extend(self._generate_autoserv_label_args(task))312 super(RepairTask, self).__init__(task, args)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful