Best Python code snippet using autotest_python
scheduler_models_unittest.py
Source:scheduler_models_unittest.py  
...176        hosts.reverse()177        hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)178        self.assertEqual(expected_order, [h.hostname for h in hosts])179class HostQueueEntryTest(BaseSchedulerModelsTest):180    def _create_hqe(self, dependency_labels=(), **create_job_kwargs):181        job = self._create_job(**create_job_kwargs)182        for label in dependency_labels:183            job.dependency_labels.add(label)184        hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id))185        self.assertEqual(1, len(hqes))186        return hqes[0]187    def _check_hqe_labels(self, hqe, expected_labels):188        expected_labels = set(expected_labels)189        label_names = set(label.name for label in hqe.get_labels())190        self.assertEqual(expected_labels, label_names)191    def test_get_labels_empty(self):192        hqe = self._create_hqe(hosts=[1])193        labels = list(hqe.get_labels())194        self.assertEqual([], labels)195    def test_get_labels_metahost(self):196        hqe = self._create_hqe(metahosts=[2])197        self._check_hqe_labels(hqe, ['label2'])198    def test_get_labels_dependencies(self):199        hqe = self._create_hqe(dependency_labels=(self.label3,),200                               metahosts=[1])201        self._check_hqe_labels(hqe, ['label1', 'label3'])202    def setup_abort_test(self, agent_finished=True):203        """Setup the variables for testing abort method.204        @param agent_finished: True to mock agent is finished before aborting205                               the hqe.206        @return hqe, dispatcher: Mock object of hqe and dispatcher to be used207                               to test abort method.208        """209        hqe = self._create_hqe(hosts=[1])210        hqe.aborted = True211        hqe.complete = False212        hqe.status = models.HostQueueEntry.Status.STARTING213        hqe.started_on = datetime.datetime.now()214        dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,215                                                'Dispatcher')216        agent = self.god.create_mock_class(monitor_db.Agent, 'Agent')217        dispatcher.get_agents_for_entry.expect_call(hqe).and_return([agent])218        agent.is_done.expect_call().and_return(agent_finished)219        return hqe, dispatcher220    def test_abort_fail_with_unfinished_agent(self):221        """abort should fail if the hqe still has agent not finished.222        """223        hqe, dispatcher = self.setup_abort_test(agent_finished=False)224        self.assertIsNone(hqe.finished_on)225        with self.assertRaises(AssertionError):226            hqe.abort(dispatcher)227        self.god.check_playback()228        # abort failed, finished_on should not be set229        self.assertIsNone(hqe.finished_on)230    def test_abort_success(self):231        """abort should succeed if all agents for the hqe are finished.232        """233        hqe, dispatcher = self.setup_abort_test(agent_finished=True)234        self.assertIsNone(hqe.finished_on)235        hqe.abort(dispatcher)236        self.god.check_playback()237        self.assertIsNotNone(hqe.finished_on)238    def test_set_finished_on(self):239        """Test that finished_on is set when hqe completes."""240        for status in host_queue_entry_states.Status.values:241            hqe = self._create_hqe(hosts=[1])242            hqe.started_on = datetime.datetime.now()243            hqe.job.update_field('shard_id', 3)244            self.assertIsNone(hqe.finished_on)245            hqe.set_status(status)246            if status in host_queue_entry_states.COMPLETE_STATUSES:247                self.assertIsNotNone(hqe.finished_on)248                self.assertIsNone(hqe.job.shard_id)249            else:250                self.assertIsNone(hqe.finished_on)251                self.assertEquals(hqe.job.shard_id, 3)252class JobTest(BaseSchedulerModelsTest):253    def setUp(self):254        super(JobTest, self).setUp()255        def _mock_create(**kwargs):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
