Best Python code snippet using autotest_python
rdb_integration_tests.py
Source:rdb_integration_tests.py  
...101        job = self.create_job(deps=set(['a']))102        host = self.db_helper.create_host('h1', deps=set(['a']))103        host.leased = 1104        host.save()105        queue_entries = self._dispatcher._refresh_pending_queue_entries()106        hosts = list(rdb_lib.acquire_hosts(queue_entries))107        self.assertTrue(len(hosts) == 1 and hosts[0] is None)108    def testAcquireLeasedHostRace(self):109        """Test behaviour when hosts are leased just before acquisition.110        If a fraction of the hosts somehow get leased between finding and111        acquisition, the rdb should just return the remaining hosts for the112        request to use.113        @raises AssertionError: If both the requests get a host successfully,114            since one host gets leased before the final attempt to lease both.115        """116        j1 = self.create_job(deps=set(['a']))117        j2 = self.create_job(deps=set(['a']))118        hosts = [self.db_helper.create_host('h1', deps=set(['a'])),119                 self.db_helper.create_host('h2', deps=set(['a']))]120        @rdb_hosts.return_rdb_host121        def local_find_hosts(host_query_manger, deps, acls):122            """Return a predetermined list of hosts, one of which is leased."""123            h1 = models.Host.objects.get(hostname='h1')124            h1.leased = 1125            h1.save()126            h2 = models.Host.objects.get(hostname='h2')127            return [h1, h2]128        self.god.stub_with(rdb.AvailableHostQueryManager, 'find_hosts',129                           local_find_hosts)130        queue_entries = self._dispatcher._refresh_pending_queue_entries()131        hosts = list(rdb_lib.acquire_hosts(queue_entries))132        self.assertTrue(len(hosts) == 2 and None in hosts)133        self.check_hosts(iter(hosts))134    def testHostReleaseStates(self):135        """Test that we will only release an unused host if it is in Ready.136        @raises AssertionError: If the host gets released in any other state.137        """138        host = self.db_helper.create_host('h1', deps=set(['x']))139        for state in rdb_model_extensions.AbstractHostModel.Status.names:140            host.status = state141            host.leased = 1142            host.save()143            self._release_unused_hosts()144            host = models.Host.objects.get(hostname='h1')145            self.assertTrue(host.leased == (state != 'Ready'))146    def testHostReleseHQE(self):147        """Test that we will not release a ready host if it's being used.148        @raises AssertionError: If the host is released even though it has149            been assigned to an active hqe.150        """151        # Create a host and lease it out in Ready.152        host = self.db_helper.create_host('h1', deps=set(['x']))153        host.status = 'Ready'154        host.leased = 1155        host.save()156        # Create a job and give its hqe the leased host.157        job = self.create_job(deps=set(['x']))158        self.db_helper.add_host_to_job(host, job.id)159        hqe = models.HostQueueEntry.objects.get(job_id=job.id)160        # Activate the hqe by setting its state.161        hqe.status = host_queue_entry_states.ACTIVE_STATUSES[0]162        hqe.save()163        # Make sure the hqes host isn't released, even if its in ready.164        self._release_unused_hosts()165        host = models.Host.objects.get(hostname='h1')166        self.assertTrue(host.leased == 1)167    def testBasicDepsAcls(self):168        """Test a basic deps/acls request.169        Make sure that a basic request with deps and acls, finds a host from170        the ready pool that has matching labels and is in a matching aclgroups.171        @raises AssertionError: If the request doesn't find a host, since the172            we insert a matching host in the ready pool.173        """174        deps = set(['a', 'b'])175        acls = set(['a', 'b'])176        self.db_helper.create_host('h1', deps=deps, acls=acls)177        job = self.create_job(user='autotest_system', deps=deps, acls=acls)178        queue_entries = self._dispatcher._refresh_pending_queue_entries()179        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()180        self.check_host_assignment(job.id, matching_host.id)181        self.assertTrue(matching_host.leased == 1)182    def testPreferredDeps(self):183        """Test that perferred deps is respected.184        If multiple hosts satisfied a job's deps, the one with preferred185        label will be assigned to the job.186        @raises AssertionError: If a host without a preferred label is187                                assigned to the job instead of one with188                                a preferred label.189        """190        lumpy_deps = set(['board:lumpy'])191        stumpy_deps = set(['board:stumpy'])192        stumpy_deps_with_crosversion = set(193                ['board:stumpy', 'cros-version:lumpy-release/R41-6323.0.0'])194        acls = set(['a', 'b'])195        # Hosts lumpy1 and lumpy2 are created as a control group,196        # which ensures that if no preferred label is used, the host197        # with a smaller id will be chosen first. We need to make sure198        # stumpy2 was chosen because it has a cros-version label, but not199        # because of other randomness.200        self.db_helper.create_host('lumpy1', deps=lumpy_deps, acls=acls)201        self.db_helper.create_host('lumpy2', deps=lumpy_deps, acls=acls)202        self.db_helper.create_host('stumpy1', deps=stumpy_deps, acls=acls)203        self.db_helper.create_host(204                    'stumpy2', deps=stumpy_deps_with_crosversion , acls=acls)205        job_1 = self.create_job(user='autotest_system',206                              deps=lumpy_deps, acls=acls)207        job_2 = self.create_job(user='autotest_system',208                              deps=stumpy_deps_with_crosversion, acls=acls)209        queue_entries = self._dispatcher._refresh_pending_queue_entries()210        matching_hosts  = list(rdb_lib.acquire_hosts(queue_entries))211        assignment = {}212        import logging213        for job, host in zip(queue_entries, matching_hosts):214            self.check_host_assignment(job.id, host.id)215            assignment[job.id] = host.hostname216        self.assertEqual(assignment[job_1.id], 'lumpy1')217        self.assertEqual(assignment[job_2.id], 'stumpy2')218    def testBadDeps(self):219        """Test that we find no hosts when only acls match.220        @raises AssertionError: If the request finds a host, since the only221            host in the ready pool will not have matching deps.222        """223        host_labels = set(['a'])224        job_deps = set(['b'])225        acls = set(['a', 'b'])226        self.db_helper.create_host('h1', deps=host_labels, acls=acls)227        job = self.create_job(user='autotest_system', deps=job_deps, acls=acls)228        queue_entries = self._dispatcher._refresh_pending_queue_entries()229        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()230        self.assert_(not matching_host)231    def testBadAcls(self):232        """Test that we find no hosts when only deps match.233        @raises AssertionError: If the request finds a host, since the only234            host in the ready pool will not have matching acls.235        """236        deps = set(['a'])237        host_acls = set(['a'])238        job_acls = set(['b'])239        self.db_helper.create_host('h1', deps=deps, acls=host_acls)240        # Create the job as a new user who is only in the 'b' and 'Everyone'241        # aclgroups. Though there are several hosts in the Everyone group, the242        # 1 host that has the 'a' dep isn't.243        job = self.create_job(user='new_user', deps=deps, acls=job_acls)244        queue_entries = self._dispatcher._refresh_pending_queue_entries()245        matching_host  = rdb_lib.acquire_hosts(queue_entries).next()246        self.assert_(not matching_host)247    def testBasicPriority(self):248        """Test that priority inversion doesn't happen.249        Schedule 2 jobs with the same deps, acls and user, but different250        priorities, and confirm that the higher priority request gets the host.251        This confirmation happens through the AssignmentValidator.252        @raises AssertionError: If the un important request gets host h1 instead253            of the important request.254        """255        deps = set(['a', 'b'])256        acls = set(['a', 'b'])257        self.db_helper.create_host('h1', deps=deps, acls=acls)258        important_job = self.create_job(user='autotest_system',259                deps=deps, acls=acls, priority=2)260        un_important_job = self.create_job(user='autotest_system',261                deps=deps, acls=acls, priority=0)262        queue_entries = self._dispatcher._refresh_pending_queue_entries()263        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',264                AssignmentValidator.priority_checking_response_handler)265        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))266    def testPriorityLevels(self):267        """Test that priority inversion doesn't happen.268        Increases a job's priority and makes several requests for hosts,269        checking that priority inversion doesn't happen.270        @raises AssertionError: If the unimportant job gets h1 while it is271            still unimportant, or doesn't get h1 while after it becomes the272            most important job.273        """274        deps = set(['a', 'b'])275        acls = set(['a', 'b'])276        self.db_helper.create_host('h1', deps=deps, acls=acls)277        # Create jobs that will bucket differently and confirm that jobs in an278        # earlier bucket get a host.279        first_job = self.create_job(user='autotest_system', deps=deps, acls=acls)280        important_job = self.create_job(user='autotest_system', deps=deps,281                acls=acls, priority=2)282        deps.pop()283        unimportant_job = self.create_job(user='someother_system', deps=deps,284                acls=acls, priority=1)285        queue_entries = self._dispatcher._refresh_pending_queue_entries()286        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',287                AssignmentValidator.priority_checking_response_handler)288        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))289        # Elevate the priority of the unimportant job, so we now have290        # 2 jobs at the same priority.291        self.db_helper.increment_priority(job_id=unimportant_job.id)292        queue_entries = self._dispatcher._refresh_pending_queue_entries()293        self._release_unused_hosts()294        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))295        # Prioritize the first job, and confirm that it gets the host over the296        # jobs that got it the last time.297        self.db_helper.increment_priority(job_id=unimportant_job.id)298        queue_entries = self._dispatcher._refresh_pending_queue_entries()299        self._release_unused_hosts()300        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))301    def testFrontendJobScheduling(self):302        """Test that basic frontend job scheduling.303        @raises AssertionError: If the received and requested host don't match,304            or the mis-matching host is returned instead.305        """306        deps = set(['x', 'y'])307        acls = set(['a', 'b'])308        # Create 2 frontend jobs and only one matching host.309        matching_job = self.create_job(acls=acls, deps=deps)310        matching_host = self.db_helper.create_host('h1', acls=acls, deps=deps)311        mis_matching_job = self.create_job(acls=acls, deps=deps)312        mis_matching_host = self.db_helper.create_host(313                'h2', acls=acls, deps=deps.pop())314        self.db_helper.add_host_to_job(matching_host, matching_job.id)315        self.db_helper.add_host_to_job(mis_matching_host, mis_matching_job.id)316        # Check that only the matching host is returned, and that we get 'None'317        # for the second request.318        queue_entries = self._dispatcher._refresh_pending_queue_entries()319        hosts = list(rdb_lib.acquire_hosts(queue_entries))320        self.assertTrue(len(hosts) == 2 and None in hosts)321        returned_host = [host for host in hosts if host].pop()322        self.assertTrue(matching_host.id == returned_host.id)323    def testFrontendJobPriority(self):324        """Test that frontend job scheduling doesn't ignore priorities.325        @raises ValueError: If the priorities of frontend jobs are ignored.326        """327        board = 'x'328        high_priority = self.create_job(priority=2, deps=set([board]))329        low_priority = self.create_job(priority=1, deps=set([board]))330        host = self.db_helper.create_host('h1', deps=set([board]))331        self.db_helper.add_host_to_job(host, low_priority.id)332        self.db_helper.add_host_to_job(host, high_priority.id)333        queue_entries = self._dispatcher._refresh_pending_queue_entries()334        def local_response_handler(request_manager):335            """Confirms that a higher priority frontend job gets a host.336            @raises ValueError: If priority inversion happens and the job337                with priority 1 gets the host instead.338            """339            result = request_manager.api_call(request_manager.request_queue)340            if not result:341                raise ValueError('Excepted the high priority request to '342                                 'get a host, but the result is empty.')343            for request, hosts in result.iteritems():344                if request.priority == 1:345                    raise ValueError('Priority of frontend job ignored.')346                if len(hosts) > 1:347                    raise ValueError('Multiple hosts returned against one '348                                     'frontend job scheduling request.')349                yield hosts[0]350        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',351                           local_response_handler)352        self.check_hosts(rdb_lib.acquire_hosts(queue_entries))353    def testSuiteOrderedHostAcquisition(self):354        """Test that older suite jobs acquire hosts first.355        Make sure older suite jobs get hosts first, but not at the expense of356        higher priority jobs.357        @raises ValueError: If unexpected acquisitions occur, eg:358            suite_job_2 acquires the last 2 hosts instead of suite_job_1.359            isolated_important_job doesn't get any hosts.360            Any job acquires more hosts than necessary.361        """362        board = 'x'363        # Create 2 suites such that the later suite has an ordering of deps364        # that places it ahead of the earlier suite, if parent_job_id is365        # ignored.366        suite_without_dep = self.create_suite(num=2, priority=0, board=board)367        suite_with_dep = self.create_suite(num=1, priority=0, board=board)368        self.db_helper.add_deps_to_job(suite_with_dep[0], dep_names=list('y'))369        # Create an important job that should be ahead of the first suite,370        # because priority trumps parent_job_id and time of creation.371        isolated_important_job = self.create_job(priority=3, deps=set([board]))372        # Create 3 hosts, all with the deps to satisfy the last suite.373        for i in range(0, 3):374            self.db_helper.create_host('h%s' % i, deps=set([board, 'y']))375        queue_entries = self._dispatcher._refresh_pending_queue_entries()376        def local_response_handler(request_manager):377            """Reorder requests and check host acquisition.378            @raises ValueError: If unexpected/no acquisitions occur.379            """380            if any([request for request in request_manager.request_queue381                    if request.parent_job_id is None]):382                raise ValueError('Parent_job_id can never be None.')383            # This will result in the ordering:384            # [suite_2_1, suite_1_*, suite_1_*, isolated_important_job]385            # The priority scheduling order should be:386            # [isolated_important_job, suite_1_*, suite_1_*, suite_2_1]387            # Since:388            #   a. the isolated_important_job is the most important.389            #   b. suite_1 was created before suite_2, regardless of deps390            disorderly_queue = sorted(request_manager.request_queue,391                    key=lambda r: -r.parent_job_id)392            request_manager.request_queue = disorderly_queue393            result = request_manager.api_call(request_manager.request_queue)394            if not result:395                raise ValueError('Expected results but got none.')396            # Verify that the isolated_important_job got a host, and that the397            # first suite got both remaining free hosts.398            for request, hosts in result.iteritems():399                if request.parent_job_id == 0:400                    if len(hosts) > 1:401                        raise ValueError('First job acquired more hosts than '402                                'necessary. Response map: %s' % result)403                    continue404                if request.parent_job_id == 1:405                    if len(hosts) < 2:406                        raise ValueError('First suite job requests were not '407                                'satisfied. Response_map: %s' % result)408                    continue409                # The second suite job got hosts instead of one of410                # the others. Eitherway this is a failure.411                raise ValueError('Unexpected host acquisition '412                        'Response map: %s' % result)413            yield None414        self.god.stub_with(rdb_requests.BaseHostRequestManager, 'response',415                           local_response_handler)416        list(rdb_lib.acquire_hosts(queue_entries))417    def testConfigurations(self):418        """Test that configurations don't matter.419        @raises AssertionError: If the request doesn't find a host,420                 this will happen if configurations are not stripped out.421        """422        self.god.stub_with(provision.Cleanup,423                           '_actions',424                           {'action': 'fakeTest'})425        job_labels = set(['action', 'a'])426        host_deps = set(['a'])427        db_host = self.db_helper.create_host('h1', deps=host_deps)428        self.create_job(user='autotest_system', deps=job_labels)429        queue_entries = self._dispatcher._refresh_pending_queue_entries()430        matching_host = rdb_lib.acquire_hosts(queue_entries).next()431        self.assert_(matching_host.id == db_host.id)432class RDBMinDutTest(433        rdb_testing_utils.AbstractBaseRDBTester, unittest.TestCase):434    """Test AvailableHostRequestHandler"""435    _config_section = 'AUTOTEST_WEB'436    def min_dut_test_helper(self, num_hosts, suite_settings):437        """A helper function to test min_dut logic.438        @param num_hosts: Total number of hosts to create.439        @param suite_settings: A dictionary specify how suites would be created440                               and verified.441                E.g.  {'priority': 10, 'num_jobs': 3,442                       'min_duts':2, 'expected_aquired': 1}443                       With this setting, will create a suite that has 3444                       child jobs, with priority 10 and min_duts 2.445                       The suite is expected to get 1 dut.446        """447        acls = set(['fake_acl'])448        hosts = []449        for i in range (0, num_hosts):450            hosts.append(self.db_helper.create_host(451                'h%d' % i, deps=set(['board:lumpy']), acls=acls))452        suites = {}453        suite_min_duts = {}454        for setting in suite_settings:455            s = self.create_suite(num=setting['num_jobs'],456                                  priority=setting['priority'],457                                  board='board:lumpy', acls=acls)458            # Empty list will be used to store acquired hosts.459            suites[s['parent_job'].id] = (setting, [])460            suite_min_duts[s['parent_job'].id] = setting['min_duts']461        queue_entries = self._dispatcher._refresh_pending_queue_entries()462        matching_hosts = rdb_lib.acquire_hosts(queue_entries, suite_min_duts)463        for host, queue_entry in zip(matching_hosts, queue_entries):464            if host:465                suites[queue_entry.job.parent_job_id][1].append(host)466        for setting, hosts in suites.itervalues():467            self.assertEqual(len(hosts),setting['expected_aquired'])468    def testHighPriorityTakeAll(self):469        """Min duts not satisfied."""470        num_hosts = 1471        suite1 = {'priority':20, 'num_jobs': 3, 'min_duts': 2,472                  'expected_aquired': 1}473        suite2 = {'priority':10, 'num_jobs': 7, 'min_duts': 5,474                  'expected_aquired': 0}475        self.min_dut_test_helper(num_hosts, [suite1, suite2])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
