Best Python code snippet using autotest_python
job.py
Source:job.py  
...56                job_ids.append(job_id)57            else:58                job_names.append(job_id)59        return (job_ids, job_names)60    def execute_on_ids_and_names(self, op, filters={},61                                 check_results={'id__in': 'id',62                                                'name__in': 'id'},63                                 tag_id='id__in', tag_name='name__in'):64        if not self.jobs:65            # Want everything66            return super(job_list_stat, self).execute(op=op, filters=filters)67        all_jobs = []68        (job_ids, job_names) = self.__split_jobs_between_ids_names()69        for items, tag in [(job_ids, tag_id),70                          (job_names, tag_name)]:71            if items:72                new_filters = filters.copy()73                new_filters[tag] = items74                jobs = super(job_list_stat,75                             self).execute(op=op,76                                           filters=new_filters,77                                           check_results=check_results)78                all_jobs.extend(jobs)79        return all_jobs80class job_list(job_list_stat):81    """atest job list [<jobs>] [--all] [--running] [--user <username>]"""82    def __init__(self):83        super(job_list, self).__init__()84        self.parser.add_option('-a', '--all', help='List jobs for all '85                               'users.', action='store_true', default=False)86        self.parser.add_option('-r', '--running', help='List only running '87                               'jobs', action='store_true')88        self.parser.add_option('-u', '--user', help='List jobs for given '89                               'user', type='string')90    def parse(self):91        options, leftover = super(job_list, self).parse()92        self.all = options.all93        self.data['running'] = options.running94        if options.user:95            if options.all:96                self.invalid_syntax('Only specify --all or --user, not both.')97            else:98                self.data['owner'] = options.user99        elif not options.all and not self.jobs:100            self.data['owner'] = getpass.getuser()101        return options, leftover102    def execute(self):103        return self.execute_on_ids_and_names(op='get_jobs_summary',104                                             filters=self.data)105    def output(self, results):106        keys = ['id', 'owner', 'name', 'status_counts']107        if self.verbose:108            keys.extend(['priority', 'control_type', 'created_on'])109        self._convert_status(results)110        super(job_list, self).output(results, keys)111class job_stat(job_list_stat):112    """atest job stat <job>"""113    usage_action = 'stat'114    def __init__(self):115        super(job_stat, self).__init__()116        self.parser.add_option('-f', '--control-file',117                               help='Display the control file',118                               action='store_true', default=False)119        self.parser.add_option('-N', '--list-hosts',120                               help='Display only a list of hosts',121                               action='store_true')122        self.parser.add_option('-s', '--list-hosts-status',123                               help='Display only the hosts in these statuses '124                               'for a job.', action='store')125    def parse(self):126        status_list = topic_common.item_parse_info(127                attribute_name='status_list',128                inline_option='list_hosts_status')129        options, leftover = super(job_stat, self).parse([status_list],130                                                        req_items='jobs')131        if not self.jobs:132            self.invalid_syntax('Must specify at least one job.')133        self.show_control_file = options.control_file134        self.list_hosts = options.list_hosts135        if self.list_hosts and self.status_list:136            self.invalid_syntax('--list-hosts is implicit when using '137                                '--list-hosts-status.')138        if len(self.jobs) > 1 and (self.list_hosts or self.status_list):139            self.invalid_syntax('--list-hosts and --list-hosts-status should '140                                'only be used on a single job.')141        return options, leftover142    def _merge_results(self, summary, qes):143        hosts_status = {}144        for qe in qes:145            if qe['host']:146                job_id = qe['job']['id']147                hostname = qe['host']['hostname']148                hosts_status.setdefault(job_id,149                                        {}).setdefault(qe['status'],150                                                       []).append(hostname)151        for job in summary:152            job_id = job['id']153            if hosts_status.has_key(job_id):154                this_job = hosts_status[job_id]155                job['hosts'] = ' '.join(' '.join(host) for host in156                                        this_job.itervalues())157                host_per_status = ['%s="%s"' %(status, ' '.join(host))158                                   for status, host in this_job.iteritems()]159                job['hosts_status'] = ', '.join(host_per_status)160                if self.status_list:161                    statuses = set(s.lower() for s in self.status_list)162                    all_hosts = [s for s in host_per_status if s.split('=',163                                 1)[0].lower() in statuses]164                    job['hosts_selected_status'] = '\n'.join(all_hosts)165            else:166                job['hosts_status'] = ''167            if not job.get('hosts'):168                self.generic_error('Job has unassigned meta-hosts, '169                                   'try again shortly.')170        return summary171    def execute(self):172        summary = self.execute_on_ids_and_names(op='get_jobs_summary')173        # Get the real hostnames174        qes = self.execute_on_ids_and_names(op='get_host_queue_entries',175                                            check_results={},176                                            tag_id='job__in',177                                            tag_name='job__name__in')178        self._convert_status(summary)179        return self._merge_results(summary, qes)180    def output(self, results):181        if self.list_hosts:182            keys = ['hosts']183        elif self.status_list:184            keys = ['hosts_selected_status']185        elif not self.verbose:186            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status']187        else:188            keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status',...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
