Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py  
...604        self.assert_(isinstance(agent, monitor_db.Agent))605        self.assert_(agent.task)606        return agent.task607    def test_run_if_ready_delays(self):608        # Also tests Job.run_with_ready_delay() on atomic group jobs.609        django_job = self._create_job(hosts=[5, 6], atomic_group=1)610        job = scheduler_models.Job(django_job.id)611        self.assertEqual(1, job.synch_count)612        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))613        self.assertEqual(2, len(django_hqes))614        self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)615        def set_hqe_status(django_hqe, status):616            django_hqe.status = status617            django_hqe.save()618            scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)619        # An initial state, our synch_count is 1620        set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)621        set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)622        # So that we don't depend on the config file value during the test.623        self.assert_(scheduler_config.config624                     .secs_to_wait_for_atomic_group_hosts is not None)625        self.god.stub_with(scheduler_config.config,626                           'secs_to_wait_for_atomic_group_hosts', 123456)627        # Get the pending one as a scheduler_models.HostQueueEntry object.628        hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)629        self.assert_(not job._delay_ready_task)630        self.assertTrue(job.is_ready())631        # Ready with one pending, one verifying and an atomic group should632        # result in a DelayCallTask to re-check if we're ready a while later.633        job.run_if_ready(hqe)634        self.assertEquals('Waiting', hqe.status)635        self._dispatcher._schedule_delay_tasks()636        self.assertEquals('Pending', hqe.status)637        agent = self._dispatcher._agents[0]638        self.assert_(job._delay_ready_task)639        self.assert_(isinstance(agent, monitor_db.Agent))640        self.assert_(agent.task)641        delay_task = agent.task642        self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))643        self.assert_(not delay_task.is_done())644        self.god.stub_function(delay_task, 'abort')645        self.god.stub_function(job, 'run')646        self.god.stub_function(job, '_pending_count')647        self.god.stub_with(job, 'synch_count', 9)648        self.god.stub_function(job, 'request_abort')649        # Test that the DelayedCallTask's callback queued up above does the650        # correct thing and does not call run if there are not enough hosts651        # in pending after the delay.652        job._pending_count.expect_call().and_return(0)653        job.request_abort.expect_call()654        delay_task._callback()655        self.god.check_playback()656        # Test that the DelayedCallTask's callback queued up above does the657        # correct thing and returns the Agent returned by job.run() if658        # there are still enough hosts pending after the delay.659        job.synch_count = 4660        job._pending_count.expect_call().and_return(4)661        job.run.expect_call(hqe)662        delay_task._callback()663        self.god.check_playback()664        job._pending_count.expect_call().and_return(4)665        # Adjust the delay deadline so that enough time has passed.666        job._delay_ready_task.end_time = time.time() - 111111667        job.run.expect_call(hqe)668        # ...the delay_expired condition should cause us to call run()669        self._dispatcher._handle_agents()670        self.god.check_playback()671        delay_task.success = False672        # Adjust the delay deadline back so that enough time has not passed.673        job._delay_ready_task.end_time = time.time() + 111111674        self._dispatcher._handle_agents()675        self.god.check_playback()676        # Now max_number_of_machines HQEs are in pending state.  Remaining677        # delay will now be ignored.678        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)679        self.god.unstub(job, 'run')680        self.god.unstub(job, '_pending_count')681        self.god.unstub(job, 'synch_count')682        self.god.unstub(job, 'request_abort')683        # ...the over_max_threshold test should cause us to call run()684        delay_task.abort.expect_call()685        other_hqe.on_pending()686        self.assertEquals('Starting', other_hqe.status)687        self.assertEquals('Starting', hqe.status)688        self.god.stub_function(job, 'run')689        self.god.unstub(delay_task, 'abort')690        hqe.set_status('Pending')691        other_hqe.set_status('Pending')692        # Now we're not over the max for the atomic group.  But all assigned693        # hosts are in pending state.  over_max_threshold should make us run().694        hqe.atomic_group.max_number_of_machines += 1695        hqe.atomic_group.save()696        job.run.expect_call(hqe)697        hqe.on_pending()698        self.god.check_playback()699        hqe.atomic_group.max_number_of_machines -= 1700        hqe.atomic_group.save()701        other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)702        self.assertTrue(hqe.job is other_hqe.job)703        # DBObject classes should reuse instances so these should be the same.704        self.assertEqual(job, other_hqe.job)705        self.assertEqual(other_hqe.job, hqe.job)706        # Be sure our delay was not lost during the other_hqe construction.707        self.assertEqual(job._delay_ready_task, delay_task)708        self.assert_(job._delay_ready_task)709        self.assertFalse(job._delay_ready_task.is_done())710        self.assertFalse(job._delay_ready_task.aborted)711        # We want the real run() to be called below.712        self.god.unstub(job, 'run')713        # We pass in the other HQE this time the same way it would happen714        # for real when one host finishes verifying and enters pending.715        job.run_if_ready(other_hqe)716        # The delayed task must be aborted by the actual run() call above.717        self.assertTrue(job._delay_ready_task.aborted)718        self.assertFalse(job._delay_ready_task.success)719        self.assertTrue(job._delay_ready_task.is_done())720        # Check that job run() and _finish_run() were called by the above:721        self._dispatcher._schedule_running_host_queue_entries()722        agent = self._dispatcher._agents[0]723        self.assert_(agent.task)724        task = agent.task725        self.assert_(isinstance(task, monitor_db.QueueTask))726        # Requery these hqes in order to verify the status from the DB.727        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))728        for entry in django_hqes:729            self.assertEqual(models.HostQueueEntry.Status.STARTING,730                             entry.status)731        # We're already running, but more calls to run_with_ready_delay can732        # continue to come in due to straggler hosts enter Pending.  Make733        # sure we don't do anything.734        self.god.stub_function(job, 'run')735        job.run_with_ready_delay(hqe)736        self.god.check_playback()737        self.god.unstub(job, 'run')738    def test_run_synchronous_atomic_group_ready(self):739        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)740        self._update_hqe("status='Pending', execution_subdir=''")741        queue_task = self._test_run_helper(expect_starting=True)742        self.assert_(isinstance(queue_task, monitor_db.QueueTask))743        # Atomic group jobs that do not depend on a specific label in the744        # atomic group will use the atomic group name as their group name.745        self.assertEquals(queue_task.queue_entries[0].get_group_name(),746                          'atomic1')747    def test_run_synchronous_atomic_group_with_label_ready(self):748        job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)749        job.dependency_labels.add(self.label4)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
