Best Python code snippet using autotest_python
frontend.py
Source:frontend.py  
...386        """387        results = []388        for job in jobs:389            if getattr(job, 'result', None) is None:390                job.result = self.poll_job_results(tko, job)391                if job.result is not None:392                    self.result_notify(job, email_from, email_to)393            results.append(job.result)394            self.print_job_result(job)395        if None in results:396            return None397        elif False in results or "Abort" in results:398            return False399        else:400            return True401    def _included_platform(self, host, platforms):402        """403        See if host's platforms matches any of the patterns in the included404        platforms list.405        """406        if not platforms:407            return True        # No filtering of platforms408        for platform in platforms:409            if re.search(platform, host.platform):410                return True411        return False412    def invoke_test(self, pairing, kernel, kernel_label, priority='Medium',413                    kernel_cmdline=None, **dargs):414        """415        Given a pairing of a control file to a machine label, find all machines416        with that label, and submit that control file to them.417        :param kernel_label: Label (string) of the kernel to run such as418                '<kernel-version> : <config> : <date>'419                If any pairing object has its job_label attribute set it420                will override this value for that particular job.421        :return: A list of job objects.422        """423        # The pairing can override the job label.424        if pairing.job_label:425            kernel_label = pairing.job_label426        job_name = '%s : %s' % (pairing.machine_label, kernel_label)427        hosts = self.get_hosts(multiple_labels=[pairing.machine_label])428        platforms = pairing.platforms429        hosts = [h for h in hosts if self._included_platform(h, platforms)]430        dead_statuses = self.host_statuses(live=False)431        host_list = [h.hostname for h in hosts if h.status not in dead_statuses]432        print 'HOSTS: %s' % host_list433        if pairing.atomic_group_sched:434            dargs['synch_count'] = pairing.synch_count435            dargs['atomic_group_name'] = pairing.machine_label436        else:437            dargs['hosts'] = host_list438        new_job = self.create_job_by_test(name=job_name,439                                          dependencies=[pairing.machine_label],440                                          tests=[pairing.control_file],441                                          priority=priority,442                                          kernel=kernel,443                                          kernel_cmdline=kernel_cmdline,444                                          use_container=pairing.container,445                                          **dargs)446        if new_job:447            if pairing.testname:448                new_job.testname = pairing.testname449            print 'Invoked test %s : %s' % (new_job.id, job_name)450        return new_job451    def _job_test_results(self, tko, job, debug, tests=[]):452        """453        Retrieve test results for a job454        """455        job.test_status = {}456        try:457            test_statuses = tko.get_status_counts(job=job.id)458        except Exception:459            print "Ignoring exception on poll job; RPC interface is flaky"460            traceback.print_exc()461            return462        for test_status in test_statuses:463            # SERVER_JOB is buggy, and often gives false failures. Ignore it.464            if test_status.test_name == 'SERVER_JOB':465                continue466            # if tests is not empty, restrict list of test_statuses to tests467            if tests and test_status.test_name not in tests:468                continue469            if debug:470                print test_status471            hostname = test_status.hostname472            if hostname not in job.test_status:473                job.test_status[hostname] = TestResults()474            job.test_status[hostname].add(test_status)475    def _job_results_platform_map(self, job, debug):476        # Figure out which hosts passed / failed / aborted in a job477        # Creates a 2-dimensional hash, stored as job.results_platform_map478        #     1st index - platform type (string)479        #     2nd index - Status (string)480        #         'Completed' / 'Failed' / 'Aborted'481        #     Data indexed by this hash is a list of hostnames (text strings)482        job.results_platform_map = {}483        try:484            job_statuses = self.get_host_queue_entries(job=job.id)485        except Exception:486            print "Ignoring exception on poll job; RPC interface is flaky"487            traceback.print_exc()488            return None489        platform_map = {}490        job.job_status = {}491        job.metahost_index = {}492        for job_status in job_statuses:493            # This is basically "for each host / metahost in the job"494            if job_status.host:495                hostname = job_status.host.hostname496            else:              # This is a metahost497                metahost = job_status.meta_host498                index = job.metahost_index.get(metahost, 1)499                job.metahost_index[metahost] = index + 1500                hostname = '%s.%s' % (metahost, index)501            job.job_status[hostname] = job_status.status502            status = job_status.status503            # Skip hosts that failed verify or repair:504            # that's a machine failure, not a job failure505            if hostname in job.test_status:506                verify_failed = False507                for failure in job.test_status[hostname].fail:508                    if (failure.test_name == 'verify' or509                            failure.test_name == 'repair'):510                        verify_failed = True511                        break512                if verify_failed:513                    continue514            if hostname in job.test_status and job.test_status[hostname].fail:515                # If the any tests failed in the job, we want to mark the516                # job result as failed, overriding the default job status.517                if status != "Aborted":         # except if it's an aborted job518                    status = 'Failed'519            if job_status.host:520                platform = job_status.host.platform521            else:              # This is a metahost522                platform = job_status.meta_host523            if platform not in platform_map:524                platform_map[platform] = {'Total': [hostname]}525            else:526                platform_map[platform]['Total'].append(hostname)527            new_host_list = platform_map[platform].get(status, []) + [hostname]528            platform_map[platform][status] = new_host_list529        job.results_platform_map = platform_map530    def set_platform_results(self, test_job, platform, result):531        """532        Result must be None, 'FAIL', 'WARN' or 'GOOD'533        """534        if test_job.platform_results[platform] is not None:535            # We're already done, and results recorded. This can't change later.536            return537        test_job.platform_results[platform] = result538        # Note that self.job refers to the metajob we're IN, not the job539        # that we're excuting from here.540        testname = '%s.%s' % (test_job.testname, platform)541        if self.job:542            self.job.record(result, None, testname, status='')543    def poll_job_results(self, tko, job, debug=False):544        """545        Analyse all job results by platform, return:546            False: if any platform has more than one failure547            None:  if any platform has more than one machine not yet Good.548            True:  if all platforms have at least all-but-one machines Good.549        """550        self._job_test_results(tko, job, debug)551        if job.test_status == {}:552            return None553        self._job_results_platform_map(job, debug)554        good_platforms = []555        failed_platforms = []556        aborted_platforms = []557        unknown_platforms = []...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
