How to use _compute_next_job_for_tasks method in autotest

Best Python code snippet using autotest_python

rpc_utils.py

Source:rpc_utils.py Github

copy

Full Screen

...409 return _common_entry_to_dict(special_task, special_task.task, job_dict)410def _queue_entry_to_dict(queue_entry):411 return _common_entry_to_dict(queue_entry, 'Job',412 queue_entry.job.get_object_dict())413def _compute_next_job_for_tasks(queue_entries, special_tasks):414 """415 For each task, try to figure out the next job that ran after that task.416 This is done using two pieces of information:417 * if the task has a queue entry, we can use that entry's job ID.418 * if the task has a time_started, we can try to compare that against the419 started_on field of queue_entries. this isn't guaranteed to work perfectly420 since queue_entries may also have null started_on values.421 * if the task has neither, or if use of time_started fails, just use the422 last computed job ID.423 """424 next_job_id = None # most recently computed next job425 hqe_index = 0 # index for scanning by started_on times426 for task in special_tasks:427 if task.queue_entry:428 next_job_id = task.queue_entry.job.id429 elif task.time_started is not None:430 for queue_entry in queue_entries[hqe_index:]:431 if queue_entry.started_on is None:432 continue433 if queue_entry.started_on < task.time_started:434 break435 next_job_id = queue_entry.job.id436 task.next_job_id = next_job_id437 # advance hqe_index to just after next_job_id438 if next_job_id is not None:439 for queue_entry in queue_entries[hqe_index:]:440 if queue_entry.job.id < next_job_id:441 break442 hqe_index += 1443def interleave_entries(queue_entries, special_tasks):444 """445 Both lists should be ordered by descending ID.446 """447 _compute_next_job_for_tasks(queue_entries, special_tasks)448 # start with all special tasks that've run since the last job449 interleaved_entries = []450 for task in special_tasks:451 if task.next_job_id is not None:452 break453 interleaved_entries.append(_special_task_to_dict(task))454 # now interleave queue entries with the remaining special tasks455 special_task_index = len(interleaved_entries)456 for queue_entry in queue_entries:457 interleaved_entries.append(_queue_entry_to_dict(queue_entry))458 # add all tasks that ran between this job and the previous one459 for task in special_tasks[special_task_index:]:460 if task.next_job_id < queue_entry.job.id:461 break...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful