Best Python code snippet using autotest_python
frontend.py
Source:frontend.py  
...269    def get_host_diagnosis_interval(self, host_id, end_time, success):270        return self.run('get_host_diagnosis_interval',271                        host_id=host_id, end_time=end_time,272                        success=success)273    def create_job_by_test(self, tests, kernel=None, use_container=False,274                           kernel_cmdline=None, **dargs):275        """276        Given a test name, fetch the appropriate control file from the server277        and submit it.278        @param kernel: A comma separated list of kernel versions to boot.279        @param kernel_cmdline: The command line used to boot all kernels listed280                in the kernel parameter.281        Returns a list of job objects282        """283        assert ('hosts' in dargs or284                'atomic_group_name' in dargs and 'synch_count' in dargs)285        if kernel:286            kernel_list =  re.split('[\s,]+', kernel.strip())287            kernel_info = []288            for version in kernel_list:289                kernel_dict = {'version': version}290                if kernel_cmdline is not None:291                    kernel_dict['cmdline'] = kernel_cmdline292                kernel_info.append(kernel_dict)293        else:294            kernel_info = None295        control_file = self.generate_control_file(296                tests=tests, kernel=kernel_info, use_container=use_container)297        if control_file.is_server:298            dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.SERVER299        else:300            dargs['control_type'] = control_data.CONTROL_TYPE_NAMES.CLIENT301        dargs['dependencies'] = dargs.get('dependencies', []) + \302                                control_file.dependencies303        dargs['control_file'] = control_file.control_file304        if not dargs.get('synch_count', None):305            dargs['synch_count'] = control_file.synch_count306        if 'hosts' in dargs and len(dargs['hosts']) < dargs['synch_count']:307            # will not be able to satisfy this request308            return None309        return self.create_job(**dargs)310    def create_job(self, control_file, name=' ', priority='Medium',311                control_type=control_data.CONTROL_TYPE_NAMES.CLIENT, **dargs):312        id = self.run('create_job', name=name, priority=priority,313                 control_file=control_file, control_type=control_type, **dargs)314        return self.get_jobs(id=id)[0]315    def run_test_suites(self, pairings, kernel, kernel_label=None,316                        priority='Medium', wait=True, poll_interval=10,317                        email_from=None, email_to=None, timeout_mins=10080,318                        max_runtime_mins=10080, kernel_cmdline=None):319        """320        Run a list of test suites on a particular kernel.321        Poll for them to complete, and return whether they worked or not.322        @param pairings: List of MachineTestPairing objects to invoke.323        @param kernel: Name of the kernel to run.324        @param kernel_label: Label (string) of the kernel to run such as325                    '<kernel-version> : <config> : <date>'326                    If any pairing object has its job_label attribute set it327                    will override this value for that particular job.328        @param kernel_cmdline: The command line to boot the kernel(s) with.329        @param wait: boolean - Wait for the results to come back?330        @param poll_interval: Interval between polling for job results (in mins)331        @param email_from: Send notification email upon completion from here.332        @param email_from: Send notification email upon completion to here.333        """334        jobs = []335        for pairing in pairings:336            try:337                new_job = self.invoke_test(pairing, kernel, kernel_label,338                                           priority, timeout_mins=timeout_mins,339                                           kernel_cmdline=kernel_cmdline,340                                           max_runtime_mins=max_runtime_mins)341                if not new_job:342                    continue343                jobs.append(new_job)344            except Exception, e:345                traceback.print_exc()346        if not wait or not jobs:347            return348        tko = TKO()349        while True:350            time.sleep(60 * poll_interval)351            result = self.poll_all_jobs(tko, jobs, email_from, email_to)352            if result is not None:353                return result354    def result_notify(self, job, email_from, email_to):355        """356        Notify about the result of a job. Will always print, if email data357        is provided, will send email for it as well.358            job: job object to notify about359            email_from: send notification email upon completion from here360            email_from: send notification email upon completion to here361        """362        if job.result == True:363            subject = 'Testing PASSED: '364        else:365            subject = 'Testing FAILED: '366        subject += '%s : %s\n' % (job.name, job.id)367        text = []368        for platform in job.results_platform_map:369            for status in job.results_platform_map[platform]:370                if status == 'Total':371                    continue372                for host in job.results_platform_map[platform][status]:373                    text.append('%20s %10s %10s' % (platform, status, host))374                    if status == 'Failed':375                        for test_status in job.test_status[host].fail:376                            text.append('(%s, %s) : %s' % \377                                        (host, test_status.test_name,378                                         test_status.reason))379                        text.append('')380        base_url = 'http://' + self.server381        params = ('columns=test',382                  'rows=machine_group',383                  "condition=tag~'%s-%%25'" % job.id,384                  'title=Report')385        query_string = '&'.join(params)386        url = '%s/tko/compose_query.cgi?%s' % (base_url, query_string)387        text.append(url + '\n')388        url = '%s/afe/#tab_id=view_job&object_id=%s' % (base_url, job.id)389        text.append(url + '\n')390        body = '\n'.join(text)391        print '---------------------------------------------------'392        print 'Subject: ', subject393        print body394        print '---------------------------------------------------'395        if email_from and email_to:396            print 'Sending email ...'397            utils.send_email(email_from, email_to, subject, body)398        print399    def print_job_result(self, job):400        """401        Print the result of a single job.402            job: a job object403        """404        if job.result is None:405            print 'PENDING',406        elif job.result == True:407            print 'PASSED',408        elif job.result == False:409            print 'FAILED',410        elif job.result == "Abort":411            print 'ABORT',412        print ' %s : %s' % (job.id, job.name)413    def poll_all_jobs(self, tko, jobs, email_from=None, email_to=None):414        """415        Poll all jobs in a list.416            jobs: list of job objects to poll417            email_from: send notification email upon completion from here418            email_from: send notification email upon completion to here419        Returns:420            a) All complete successfully (return True)421            b) One or more has failed (return False)422            c) Cannot tell yet (return None)423        """424        results = []425        for job in jobs:426            if getattr(job, 'result', None) is None:427                job.result = self.poll_job_results(tko, job)428                if job.result is not None:429                    self.result_notify(job, email_from, email_to)430            results.append(job.result)431            self.print_job_result(job)432        if None in results:433            return None434        elif False in results or "Abort" in results:435            return False436        else:437            return True438    def _included_platform(self, host, platforms):439        """440        See if host's platforms matches any of the patterns in the included441        platforms list.442        """443        if not platforms:444            return True        # No filtering of platforms445        for platform in platforms:446            if re.search(platform, host.platform):447                return True448        return False449    def invoke_test(self, pairing, kernel, kernel_label, priority='Medium',450                    kernel_cmdline=None, **dargs):451        """452        Given a pairing of a control file to a machine label, find all machines453        with that label, and submit that control file to them.454        @param kernel_label: Label (string) of the kernel to run such as455                '<kernel-version> : <config> : <date>'456                If any pairing object has its job_label attribute set it457                will override this value for that particular job.458        @returns A list of job objects.459        """460        # The pairing can override the job label.461        if pairing.job_label:462            kernel_label = pairing.job_label463        job_name = '%s : %s' % (pairing.machine_label, kernel_label)464        hosts = self.get_hosts(multiple_labels=[pairing.machine_label])465        platforms = pairing.platforms466        hosts = [h for h in hosts if self._included_platform(h, platforms)]467        dead_statuses = self.host_statuses(live=False)468        host_list = [h.hostname for h in hosts if h.status not in dead_statuses]469        print 'HOSTS: %s' % host_list470        if pairing.atomic_group_sched:471            dargs['synch_count'] = pairing.synch_count472            dargs['atomic_group_name'] = pairing.machine_label473        else:474            dargs['hosts'] = host_list475        new_job = self.create_job_by_test(name=job_name,476                                          dependencies=[pairing.machine_label],477                                          tests=[pairing.control_file],478                                          priority=priority,479                                          kernel=kernel,480                                          kernel_cmdline=kernel_cmdline,481                                          use_container=pairing.container,482                                          **dargs)483        if new_job:484            if pairing.testname:485                new_job.testname = pairing.testname486            print 'Invoked test %s : %s' % (new_job.id, job_name)487        return new_job488    def _job_test_results(self, tko, job, debug, tests=[]):489        """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
