Best Python code snippet using autotest_python
scheduler_models_unittest.py
Source:scheduler_models_unittest.py  
...32        self._set_monitor_stubs()33    def tearDown(self):34        self._database.disconnect()35        self._frontend_common_teardown()36    def _update_hqe(self, set, where=''):37        query = 'UPDATE afe_host_queue_entries SET ' + set38        if where:39            query += ' WHERE ' + where40        self._do_query(query)41class DelayedCallTaskTest(unittest.TestCase):42    def setUp(self):43        self.god = mock.mock_god()44    def tearDown(self):45        self.god.unstub_all()46    def test_delayed_call(self):47        test_time = self.god.create_mock_function('time')48        test_time.expect_call().and_return(33)49        test_time.expect_call().and_return(34.01)50        test_time.expect_call().and_return(34.99)51        test_time.expect_call().and_return(35.01)52        def test_callback():53            test_callback.calls += 154        test_callback.calls = 055        delay_task = scheduler_models.DelayedCallTask(56            delay_seconds=2, callback=test_callback,57            now_func=test_time)  # time 3358        self.assertEqual(35, delay_task.end_time)59        delay_task.poll()  # activates the task and polls it once, time 34.0160        self.assertEqual(0, test_callback.calls, "callback called early")61        delay_task.poll()  # time 34.9962        self.assertEqual(0, test_callback.calls, "callback called early")63        delay_task.poll()  # time 35.0164        self.assertEqual(1, test_callback.calls)65        self.assert_(delay_task.is_done())66        self.assert_(delay_task.success)67        self.assert_(not delay_task.aborted)68        self.god.check_playback()69    def test_delayed_call_abort(self):70        delay_task = scheduler_models.DelayedCallTask(71            delay_seconds=987654, callback=lambda: None)72        delay_task.abort()73        self.assert_(delay_task.aborted)74        self.assert_(delay_task.is_done())75        self.assert_(not delay_task.success)76        self.god.check_playback()77class DBObjectTest(BaseSchedulerModelsTest):78    def test_compare_fields_in_row(self):79        host = scheduler_models.Host(id=1)80        fields = list(host._fields)81        row_data = [getattr(host, fieldname) for fieldname in fields]82        self.assertEqual({}, host._compare_fields_in_row(row_data))83        row_data[fields.index('hostname')] = 'spam'84        self.assertEqual({'hostname': ('host1', 'spam')},85                         host._compare_fields_in_row(row_data))86        row_data[fields.index('id')] = 2387        self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},88                         host._compare_fields_in_row(row_data))89    def test_compare_fields_in_row_datetime_ignores_microseconds(self):90        datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890)91        datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0)92        class TestTable(scheduler_models.DBObject):93            _table_name = 'test_table'94            _fields = ('id', 'test_datetime')95        tt = TestTable(row=[1, datetime_without_us])96        self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us]))97    def test_always_query(self):98        host_a = scheduler_models.Host(id=2)99        self.assertEqual(host_a.hostname, 'host2')100        self._do_query('UPDATE afe_hosts SET hostname="host2-updated" '101                       'WHERE id=2')102        host_b = scheduler_models.Host(id=2, always_query=True)103        self.assert_(host_a is host_b, 'Cached instance not returned.')104        self.assertEqual(host_a.hostname, 'host2-updated',105                         'Database was not re-queried')106        # If either of these are called, a query was made when it shouldn't be.107        host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')108        host_a._update_fields_from_row = host_a._compare_fields_in_row109        host_c = scheduler_models.Host(id=2, always_query=False)110        self.assert_(host_a is host_c, 'Cached instance not returned')111    def test_delete(self):112        host = scheduler_models.Host(id=3)113        host.delete()114        host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,115                                 always_query=False)116        host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,117                                 always_query=True)118    def test_save(self):119        # Dummy Job to avoid creating a one in the HostQueueEntry __init__.120        class MockJob(object):121            def __init__(self, id):122                pass123            def tag(self):124                return 'MockJob'125        self.god.stub_with(scheduler_models, 'Job', MockJob)126        hqe = scheduler_models.HostQueueEntry(127            new_record=True,128            row=[0, 1, 2, 'rhel6', 'Queued', None, 0, 0, 0, '.', None, False, None])129        hqe.save()130        new_id = hqe.id131        # Force a re-query and verify that the correct data was stored.132        scheduler_models.DBObject._clear_instance_cache()133        hqe = scheduler_models.HostQueueEntry(id=new_id)134        self.assertEqual(hqe.id, new_id)135        self.assertEqual(hqe.job_id, 1)136        self.assertEqual(hqe.host_id, 2)137        self.assertEqual(hqe.profile, 'rhel6')138        self.assertEqual(hqe.status, 'Queued')139        self.assertEqual(hqe.meta_host, None)140        self.assertEqual(hqe.active, False)141        self.assertEqual(hqe.complete, False)142        self.assertEqual(hqe.deleted, False)143        self.assertEqual(hqe.execution_subdir, '.')144        self.assertEqual(hqe.atomic_group_id, None)145        self.assertEqual(hqe.started_on, None)146class HostTest(BaseSchedulerModelsTest):147    def test_cmp_for_sort(self):148        expected_order = [149            'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',150            'host10', 'host11', 'yolkfolk']151        hostname_idx = list(scheduler_models.Host._fields).index('hostname')152        row = [None] * len(scheduler_models.Host._fields)153        hosts = []154        for hostname in expected_order:155            row[hostname_idx] = hostname156            hosts.append(scheduler_models.Host(row=row, new_record=True))157        host1 = hosts[expected_order.index('Host1')]158        host010 = hosts[expected_order.index('HOST010')]159        host10 = hosts[expected_order.index('host10')]160        host3 = hosts[expected_order.index('host3')]161        alice = hosts[expected_order.index('alice')]162        self.assertEqual(0, scheduler_models.Host.cmp_for_sort(host10, host10))163        self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host10, host010))164        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host010, host10))165        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host10))166        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host010))167        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host10))168        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host010))169        self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, host1))170        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host3))171        self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(alice, host3))172        self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, alice))173        self.assertEqual(0, scheduler_models.Host.cmp_for_sort(alice, alice))174        hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)175        self.assertEqual(expected_order, [h.hostname for h in hosts])176        hosts.reverse()177        hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)178        self.assertEqual(expected_order, [h.hostname for h in hosts])179class HostQueueEntryTest(BaseSchedulerModelsTest):180    def _create_hqe(self, dependency_labels=(), **create_job_kwargs):181        job = self._create_job(**create_job_kwargs)182        for label in dependency_labels:183            job.dependency_labels.add(label)184        hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id))185        self.assertEqual(1, len(hqes))186        return hqes[0]187    def _check_hqe_labels(self, hqe, expected_labels):188        expected_labels = set(expected_labels)189        label_names = set(label.name for label in hqe.get_labels())190        self.assertEqual(expected_labels, label_names)191    def test_get_labels_empty(self):192        hqe = self._create_hqe(hosts=[1])193        labels = list(hqe.get_labels())194        self.assertEqual([], labels)195    def test_get_labels_metahost(self):196        hqe = self._create_hqe(metahosts=[2])197        self._check_hqe_labels(hqe, ['label2'])198    def test_get_labels_dependancies(self):199        hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),200                               metahosts=[1])201        self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])202class JobTest(BaseSchedulerModelsTest):203    def setUp(self):204        super(JobTest, self).setUp()205        def _mock_create(**kwargs):206            task = models.SpecialTask(**kwargs)207            task.save()208            self._task = task209        self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)210    def _test_pre_job_tasks_helper(self):211        """212        Calls HQE._do_schedule_pre_job_tasks() and returns the created special213        task214        """215        self._task = None216        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]217        queue_entry._do_schedule_pre_job_tasks()218        return self._task219    def test_job_request_abort(self):220        django_job = self._create_job(hosts=[5, 6], atomic_group=1)221        job = scheduler_models.Job(django_job.id)222        job.request_abort()223        django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))224        for hqe in django_hqes:225            self.assertTrue(hqe.aborted)226    def test__atomic_and_has_started__on_atomic(self):227        self._create_job(hosts=[5, 6], atomic_group=1)228        job = scheduler_models.Job.fetch('id = 1')[0]229        self.assertFalse(job._atomic_and_has_started())230        self._update_hqe("status='Pending'")231        self.assertFalse(job._atomic_and_has_started())232        self._update_hqe("status='Verifying'")233        self.assertFalse(job._atomic_and_has_started())234        self.assertFalse(job._atomic_and_has_started())235        self._update_hqe("status='Failed'")236        self.assertFalse(job._atomic_and_has_started())237        self._update_hqe("status='Stopped'")238        self.assertFalse(job._atomic_and_has_started())239        self._update_hqe("status='Starting'")240        self.assertTrue(job._atomic_and_has_started())241        self._update_hqe("status='Completed'")242        self.assertTrue(job._atomic_and_has_started())243        self._update_hqe("status='Aborted'")244    def test__atomic_and_has_started__not_atomic(self):245        self._create_job(hosts=[1, 2])246        job = scheduler_models.Job.fetch('id = 1')[0]247        self.assertFalse(job._atomic_and_has_started())248        self._update_hqe("status='Starting'")249        self.assertFalse(job._atomic_and_has_started())250    def _check_special_task(self, task, task_type, queue_entry_id=None):251        self.assertEquals(task.task, task_type)252        self.assertEquals(task.host.id, 1)253        if queue_entry_id:254            self.assertEquals(task.queue_entry.id, queue_entry_id)255    def test_run_asynchronous(self):256        self._create_job(hosts=[1, 2])257        task = self._test_pre_job_tasks_helper()258        self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)259    def test_run_asynchronous_skip_verify(self):260        job = self._create_job(hosts=[1, 2])261        job.run_verify = False262        job.save()263        task = self._test_pre_job_tasks_helper()264        self.assertEquals(task, None)265    def test_run_synchronous_verify(self):266        self._create_job(hosts=[1, 2], synchronous=True)267        task = self._test_pre_job_tasks_helper()268        self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)269    def test_run_synchronous_skip_verify(self):270        job = self._create_job(hosts=[1, 2], synchronous=True)271        job.run_verify = False272        job.save()273        task = self._test_pre_job_tasks_helper()274        self.assertEquals(task, None)275    def test_run_atomic_group_already_started(self):276        self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)277        self._update_hqe("status='Starting', execution_subdir=''")278        job = scheduler_models.Job.fetch('id = 1')[0]279        queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]280        assert queue_entry.job is job281        self.assertEqual(None, job.run(queue_entry))282        self.god.check_playback()283    def test_reboot_before_always(self):284        job = self._create_job(hosts=[1])285        job.reboot_before = model_attributes.RebootBefore.ALWAYS286        job.save()287        task = self._test_pre_job_tasks_helper()288        self._check_special_task(task, models.SpecialTask.Task.CLEANUP)289    def _test_reboot_before_if_dirty_helper(self, expect_reboot):290        job = self._create_job(hosts=[1])291        job.reboot_before = model_attributes.RebootBefore.IF_DIRTY...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
