Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py  
...200    def _test_basic_scheduling_helper(self, use_metahosts):201        'Basic nonmetahost scheduling'202        self._create_job_simple([1], use_metahosts)203        self._create_job_simple([2], use_metahosts)204        self._dispatcher._schedule_new_jobs()205        self._assert_job_scheduled_on(1, 1)206        self._assert_job_scheduled_on(2, 2)207        self._check_for_extra_schedulings()208    def _test_priorities_helper(self, use_metahosts):209        'Test prioritization ordering'210        self._create_job_simple([1], use_metahosts)211        self._create_job_simple([2], use_metahosts)212        self._create_job_simple([1,2], use_metahosts)213        self._create_job_simple([1], use_metahosts, priority=1)214        self._dispatcher._schedule_new_jobs()215        self._assert_job_scheduled_on(4, 1) # higher priority216        self._assert_job_scheduled_on(2, 2) # earlier job over later217        self._check_for_extra_schedulings()218    def _test_hosts_ready_helper(self, use_metahosts):219        """220        Only hosts that are status=Ready, unlocked and not invalid get221        scheduled.222        """223        self._create_job_simple([1], use_metahosts)224        self._do_query('UPDATE hosts SET status="Running" WHERE id=1')225        self._dispatcher._schedule_new_jobs()226        self._check_for_extra_schedulings()227        self._do_query('UPDATE hosts SET status="Ready", locked=1 '228                       'WHERE id=1')229        self._dispatcher._schedule_new_jobs()230        self._check_for_extra_schedulings()231        self._do_query('UPDATE hosts SET locked=0, invalid=1 '232                       'WHERE id=1')233        self._dispatcher._schedule_new_jobs()234        if not use_metahosts:235            self._assert_job_scheduled_on(1, 1)236        self._check_for_extra_schedulings()237    def _test_hosts_idle_helper(self, use_metahosts):238        'Only idle hosts get scheduled'239        self._create_job(hosts=[1], active=True)240        self._create_job_simple([1], use_metahosts)241        self._dispatcher._schedule_new_jobs()242        self._check_for_extra_schedulings()243    def _test_obey_ACLs_helper(self, use_metahosts):244        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')245        self._create_job_simple([1], use_metahosts)246        self._dispatcher._schedule_new_jobs()247        self._check_for_extra_schedulings()248    def test_basic_scheduling(self):249        self._test_basic_scheduling_helper(False)250    def test_priorities(self):251        self._test_priorities_helper(False)252    def test_hosts_ready(self):253        self._test_hosts_ready_helper(False)254    def test_hosts_idle(self):255        self._test_hosts_idle_helper(False)256    def test_obey_ACLs(self):257        self._test_obey_ACLs_helper(False)258    def test_one_time_hosts_ignore_ACLs(self):259        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id=1')260        self._do_query('UPDATE hosts SET invalid=1 WHERE id=1')261        self._create_job_simple([1])262        self._dispatcher._schedule_new_jobs()263        self._assert_job_scheduled_on(1, 1)264        self._check_for_extra_schedulings()265    def test_non_metahost_on_invalid_host(self):266        """267        Non-metahost entries can get scheduled on invalid hosts (this is how268        one-time hosts work).269        """270        self._do_query('UPDATE hosts SET invalid=1')271        self._test_basic_scheduling_helper(False)272    def test_metahost_scheduling(self):273        """274        Basic metahost scheduling275        """276        self._test_basic_scheduling_helper(True)277    def test_metahost_priorities(self):278        self._test_priorities_helper(True)279    def test_metahost_hosts_ready(self):280        self._test_hosts_ready_helper(True)281    def test_metahost_hosts_idle(self):282        self._test_hosts_idle_helper(True)283    def test_metahost_obey_ACLs(self):284        self._test_obey_ACLs_helper(True)285    def _setup_test_only_if_needed_labels(self):286        # apply only_if_needed label3 to host1287        models.Host.smart_get('host1').labels.add(self.label3)288        return self._create_job_simple([1], use_metahost=True)289    def test_only_if_needed_labels_avoids_host(self):290        job = self._setup_test_only_if_needed_labels()291        # if the job doesn't depend on label3, there should be no scheduling292        self._dispatcher._schedule_new_jobs()293        self._check_for_extra_schedulings()294    def test_only_if_needed_labels_schedules(self):295        job = self._setup_test_only_if_needed_labels()296        job.dependency_labels.add(self.label3)297        self._dispatcher._schedule_new_jobs()298        self._assert_job_scheduled_on(1, 1)299        self._check_for_extra_schedulings()300    def test_only_if_needed_labels_via_metahost(self):301        job = self._setup_test_only_if_needed_labels()302        job.dependency_labels.add(self.label3)303        # should also work if the metahost is the only_if_needed label304        self._do_query('DELETE FROM jobs_dependency_labels')305        self._create_job(metahosts=[3])306        self._dispatcher._schedule_new_jobs()307        self._assert_job_scheduled_on(2, 1)308        self._check_for_extra_schedulings()309    def test_nonmetahost_over_metahost(self):310        """311        Non-metahost entries should take priority over metahost entries312        for the same host313        """314        self._create_job(metahosts=[1])315        self._create_job(hosts=[1])316        self._dispatcher._schedule_new_jobs()317        self._assert_job_scheduled_on(2, 1)318        self._check_for_extra_schedulings()319    def test_metahosts_obey_blocks(self):320        """321        Metahosts can't get scheduled on hosts already scheduled for322        that job.323        """324        self._create_job(metahosts=[1], hosts=[1])325        # make the nonmetahost entry complete, so the metahost can try326        # to get scheduled327        self._update_hqe(set='complete = 1', where='host_id=1')328        self._dispatcher._schedule_new_jobs()329        self._check_for_extra_schedulings()330    # TODO(gps): These should probably live in their own TestCase class331    # specific to testing HostScheduler methods directly.  It was convenient332    # to put it here for now to share existing test environment setup code.333    def test_HostScheduler_check_atomic_group_labels(self):334        normal_job = self._create_job(metahosts=[0])335        atomic_job = self._create_job(atomic_group=1)336        # Indirectly initialize the internal state of the host scheduler.337        self._dispatcher._refresh_pending_queue_entries()338        atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %339                                                     atomic_job.id).next()340        normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %341                                                     normal_job.id).next()342        host_scheduler = self._dispatcher._host_scheduler343        self.assertTrue(host_scheduler._check_atomic_group_labels(344                [self.label4.id], atomic_hqe))345        self.assertFalse(host_scheduler._check_atomic_group_labels(346                [self.label4.id], normal_hqe))347        self.assertFalse(host_scheduler._check_atomic_group_labels(348                [self.label5.id, self.label6.id, self.label7.id], normal_hqe))349        self.assertTrue(host_scheduler._check_atomic_group_labels(350                [self.label4.id, self.label6.id], atomic_hqe))351        self.assertTrue(host_scheduler._check_atomic_group_labels(352                        [self.label4.id, self.label5.id],353                        atomic_hqe))354    def test_HostScheduler_get_host_atomic_group_id(self):355        job = self._create_job(metahosts=[self.label6.id])356        queue_entry = monitor_db.HostQueueEntry.fetch(357                where='job_id=%d' % job.id).next()358        # Indirectly initialize the internal state of the host scheduler.359        self._dispatcher._refresh_pending_queue_entries()360        # Test the host scheduler361        host_scheduler = self._dispatcher._host_scheduler362        # Two labels each in a different atomic group.  This should log an363        # error and continue.364        orig_logging_error = logging.error365        def mock_logging_error(message, *args):366            mock_logging_error._num_calls += 1367            # Test the logging call itself, we just wrapped it to count it.368            orig_logging_error(message, *args)369        mock_logging_error._num_calls = 0370        self.god.stub_with(logging, 'error', mock_logging_error)371        self.assertNotEquals(None, host_scheduler._get_host_atomic_group_id(372                [self.label4.id, self.label8.id], queue_entry))373        self.assertTrue(mock_logging_error._num_calls > 0)374        self.god.unstub(logging, 'error')375        # Two labels both in the same atomic group, this should not raise an376        # error, it will merely cause the job to schedule on the intersection.377        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(378                [self.label4.id, self.label5.id]))379        self.assertEquals(None, host_scheduler._get_host_atomic_group_id([]))380        self.assertEquals(None, host_scheduler._get_host_atomic_group_id(381                [self.label3.id, self.label7.id, self.label6.id]))382        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(383                [self.label4.id, self.label7.id, self.label6.id]))384        self.assertEquals(1, host_scheduler._get_host_atomic_group_id(385                [self.label7.id, self.label5.id]))386    def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):387        # Create a job scheduled to run on label6.388        self._create_job(metahosts=[self.label6.id])389        self._dispatcher._schedule_new_jobs()390        # label6 only has hosts that are in atomic groups associated with it,391        # there should be no scheduling.392        self._check_for_extra_schedulings()393    def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):394        # Create a job scheduled to run on label5.  This is an atomic group395        # label but this job does not request atomic group scheduling.396        self._create_job(metahosts=[self.label5.id])397        self._dispatcher._schedule_new_jobs()398        # label6 only has hosts that are in atomic groups associated with it,399        # there should be no scheduling.400        self._check_for_extra_schedulings()401    def test_atomic_group_scheduling_basics(self):402        # Create jobs scheduled to run on an atomic group.403        job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],404                         atomic_group=1)405        job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],406                         atomic_group=1)407        self._dispatcher._schedule_new_jobs()408        # atomic_group.max_number_of_machines was 2 so we should run on 2.409        self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)410        self._assert_job_scheduled_on(job_b.id, 8)  # label5411        self._assert_job_scheduled_on(job_b.id, 9)  # label5412        self._check_for_extra_schedulings()413        # The three host label4 atomic group still has one host available.414        # That means a job with a synch_count of 1 asking to be scheduled on415        # the atomic group can still use the final machine.416        #417        # This may seem like a somewhat odd use case.  It allows the use of an418        # atomic group as a set of machines to run smaller jobs within (a set419        # of hosts configured for use in network tests with eachother perhaps?)420        onehost_job = self._create_job(atomic_group=1)421        self._dispatcher._schedule_new_jobs()422        self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)423        self._check_for_extra_schedulings()424        # No more atomic groups have hosts available, no more jobs should425        # be scheduled.426        self._create_job(atomic_group=1)427        self._dispatcher._schedule_new_jobs()428        self._check_for_extra_schedulings()429    def test_atomic_group_scheduling_obeys_acls(self):430        # Request scheduling on a specific atomic label but be denied by ACLs.431        self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)')432        job = self._create_job(metahosts=[self.label5.id], atomic_group=1)433        self._dispatcher._schedule_new_jobs()434        self._check_for_extra_schedulings()435    def test_atomic_group_scheduling_dependency_label_exclude(self):436        # A dependency label that matches no hosts in the atomic group.437        job_a = self._create_job(atomic_group=1)438        job_a.dependency_labels.add(self.label3)439        self._dispatcher._schedule_new_jobs()440        self._check_for_extra_schedulings()441    def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):442        # A metahost and dependency label that excludes too many hosts.443        job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],444                                 atomic_group=1)445        job_b.dependency_labels.add(self.label7)446        self._dispatcher._schedule_new_jobs()447        self._check_for_extra_schedulings()448    def test_atomic_group_scheduling_dependency_label_match(self):449        # A dependency label that exists on enough atomic group hosts in only450        # one of the two atomic group labels.451        job_c = self._create_job(synchronous=True, atomic_group=1)452        job_c.dependency_labels.add(self.label7)453        self._dispatcher._schedule_new_jobs()454        self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)455        self._check_for_extra_schedulings()456    def test_atomic_group_scheduling_no_metahost(self):457        # Force it to schedule on the other group for a reliable test.458        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')459        # An atomic job without a metahost.460        job = self._create_job(synchronous=True, atomic_group=1)461        self._dispatcher._schedule_new_jobs()462        self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)463        self._check_for_extra_schedulings()464    def test_atomic_group_scheduling_partial_group(self):465        # Make one host in labels[3] unavailable so that there are only two466        # hosts left in the group.467        self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5')468        job = self._create_job(synchronous=True, metahosts=[self.label4.id],469                         atomic_group=1)470        self._dispatcher._schedule_new_jobs()471        # Verify that it was scheduled on the 2 ready hosts in that group.472        self._assert_job_scheduled_on(job.id, 6)473        self._assert_job_scheduled_on(job.id, 7)474        self._check_for_extra_schedulings()475    def test_atomic_group_scheduling_not_enough_available(self):476        # Mark some hosts in each atomic group label as not usable.477        # One host running, another invalid in the first group label.478        self._do_query('UPDATE hosts SET status="Running" WHERE id=5')479        self._do_query('UPDATE hosts SET invalid=1 WHERE id=6')480        # One host invalid in the second group label.481        self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')482        # Nothing to schedule when no group label has enough (2) good hosts..483        self._create_job(atomic_group=1, synchronous=True)484        self._dispatcher._schedule_new_jobs()485        # There are not enough hosts in either atomic group,486        # No more scheduling should occur.487        self._check_for_extra_schedulings()488        # Now create an atomic job that has a synch count of 1.  It should489        # schedule on exactly one of the hosts.490        onehost_job = self._create_job(atomic_group=1)491        self._dispatcher._schedule_new_jobs()492        self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)493    def test_atomic_group_scheduling_no_valid_hosts(self):494        self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)')495        self._create_job(synchronous=True, metahosts=[self.label5.id],496                         atomic_group=1)497        self._dispatcher._schedule_new_jobs()498        # no hosts in the selected group and label are valid.  no schedulings.499        self._check_for_extra_schedulings()500    def test_atomic_group_scheduling_metahost_works(self):501        # Test that atomic group scheduling also obeys metahosts.502        self._create_job(metahosts=[0], atomic_group=1)503        self._dispatcher._schedule_new_jobs()504        # There are no atomic group hosts that also have that metahost.505        self._check_for_extra_schedulings()506        job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)507        self._dispatcher._schedule_new_jobs()508        self._assert_job_scheduled_on(job_b.id, 8)509        self._assert_job_scheduled_on(job_b.id, 9)510        self._check_for_extra_schedulings()511    def test_atomic_group_skips_ineligible_hosts(self):512        # Test hosts marked ineligible for this job are not eligible.513        # How would this ever happen anyways?514        job = self._create_job(metahosts=[self.label4.id], atomic_group=1)515        models.IneligibleHostQueue.objects.create(job=job, host_id=5)516        models.IneligibleHostQueue.objects.create(job=job, host_id=6)517        models.IneligibleHostQueue.objects.create(job=job, host_id=7)518        self._dispatcher._schedule_new_jobs()519        # No scheduling should occur as all desired hosts were ineligible.520        self._check_for_extra_schedulings()521    def test_atomic_group_scheduling_fail(self):522        # If synch_count is > the atomic group number of machines, the job523        # should be aborted immediately.524        model_job = self._create_job(synchronous=True, atomic_group=1)525        model_job.synch_count = 4526        model_job.save()527        job = monitor_db.Job(id=model_job.id)528        self._dispatcher._schedule_new_jobs()529        self._check_for_extra_schedulings()530        queue_entries = job.get_host_queue_entries()531        self.assertEqual(1, len(queue_entries))532        self.assertEqual(queue_entries[0].status,533                         models.HostQueueEntry.Status.ABORTED)534    def test_atomic_group_no_labels_no_scheduling(self):535        # Never schedule on atomic groups marked invalid.536        job = self._create_job(metahosts=[self.label5.id], synchronous=True,537                               atomic_group=1)538        # Deleting an atomic group via the frontend marks it invalid and539        # removes all label references to the group.  The job now references540        # an invalid atomic group with no labels associated with it.541        self.label5.atomic_group.invalid = True542        self.label5.atomic_group.save()543        self.label5.atomic_group = None544        self.label5.save()545        self._dispatcher._schedule_new_jobs()546        self._check_for_extra_schedulings()547    def test_schedule_directly_on_atomic_group_host_fail(self):548        # Scheduling a job directly on hosts in an atomic group must549        # fail to avoid users inadvertently holding up the use of an550        # entire atomic group by using the machines individually.551        job = self._create_job(hosts=[5])552        self._dispatcher._schedule_new_jobs()553        self._check_for_extra_schedulings()554    def test_schedule_directly_on_atomic_group_host(self):555        # Scheduling a job directly on one host in an atomic group will556        # work when the atomic group is listed on the HQE in addition557        # to the host (assuming the sync count is 1).558        job = self._create_job(hosts=[5], atomic_group=1)559        self._dispatcher._schedule_new_jobs()560        self._assert_job_scheduled_on(job.id, 5)561        self._check_for_extra_schedulings()562    def test_schedule_directly_on_atomic_group_hosts_sync2(self):563        job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)564        self._dispatcher._schedule_new_jobs()565        self._assert_job_scheduled_on(job.id, 5)566        self._assert_job_scheduled_on(job.id, 8)567        self._check_for_extra_schedulings()568    def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):569        job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)570        self._dispatcher._schedule_new_jobs()571        self._check_for_extra_schedulings()572    def test_only_schedule_queued_entries(self):573        self._create_job(metahosts=[1])574        self._update_hqe(set='active=1, host_id=2')575        self._dispatcher._schedule_new_jobs()576        self._check_for_extra_schedulings()577    def test_no_ready_hosts(self):578        self._create_job(hosts=[1])579        self._do_query('UPDATE hosts SET status="Repair Failed"')580        self._dispatcher._schedule_new_jobs()581        self._check_for_extra_schedulings()582class DispatcherThrottlingTest(BaseSchedulerTest):583    """584    Test that the dispatcher throttles:585     * total number of running processes586     * number of processes started per cycle587    """588    _MAX_RUNNING = 3589    _MAX_STARTED = 2590    def setUp(self):591        super(DispatcherThrottlingTest, self).setUp()592        scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING593        scheduler_config.config.max_processes_started_per_cycle = (594            self._MAX_STARTED)...site_monitor_db.py
Source:site_monitor_db.py  
...125        if self._tick_start:126            self._gauge.send('_schedule_special_tasks',127                             time.time() - self._tick_start)128    @_timer.decorate129    def _schedule_new_jobs(self):130        super(SiteDispatcher, self)._schedule_new_jobs()131        if self._tick_start:132            self._gauge.send('_schedule_new_jobs',133                             time.time() - self._tick_start)134    @_timer.decorate135    def _handle_agents(self):136        super(SiteDispatcher, self)._handle_agents()137        if self._tick_start:138            self._gauge.send('_handle_agents', time.time() - self._tick_start)139    def _reverify_hosts_where(self, where,140                              print_message='Reverifying host %s'):141        """142        This is an altered version of _reverify_hosts_where the class to143        models.SpecialTask.objects.create passes in an argument for144        requested_by, in order to allow the Reset task to be created...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
