Best Python code snippet using autotest_python
host.py
Source:host.py  
...38        self.topic_parse_info = topic_common.item_parse_info(39            attribute_name='hosts',40            filename_option='mlist',41            use_leftover=True)42    def _parse_lock_options(self, options):43        if options.lock and options.unlock:44            self.invalid_syntax('Only specify one of '45                                '--lock and --unlock.')46        if options.lock:47            self.data['locked'] = True48            self.messages.append('Locked host')49        elif options.unlock:50            self.data['locked'] = False51            self.messages.append('Unlocked host')52    def _cleanup_labels(self, labels, platform=None):53        """Removes the platform label from the overall labels"""54        if platform:55            return [label for label in labels56                    if label != platform]57        else:58            try:59                return [label for label in labels60                        if not label['platform']]61            except TypeError:62                # This is a hack - the server will soon63                # do this, so all this code should be removed.64                return labels65    def get_items(self):66        return self.hosts67class host_help(host):68    """Just here to get the atest logic working.69    Usage is set by its parent"""70    pass71class host_list(action_common.atest_list, host):72    """atest host list [--mlist <file>|<hosts>] [--label <label>]73       [--status <status1,status2>] [--acl <ACL>] [--user <user>]"""74    def __init__(self):75        super(host_list, self).__init__()76        self.parser.add_option('-b', '--label',77                               default='',78                               help='Only list hosts with all these labels '79                               '(comma separated)')80        self.parser.add_option('-s', '--status',81                               default='',82                               help='Only list hosts with any of these '83                               'statuses (comma separated)')84        self.parser.add_option('-a', '--acl',85                               default='',86                               help='Only list hosts within this ACL')87        self.parser.add_option('-u', '--user',88                               default='',89                               help='Only list hosts available to this user')90        self.parser.add_option('-N', '--hostnames-only', help='Only return '91                               'hostnames for the machines queried.',92                               action='store_true')93        self.parser.add_option('--locked',94                               default=False,95                               help='Only list locked hosts',96                               action='store_true')97        self.parser.add_option('--unlocked',98                               default=False,99                               help='Only list unlocked hosts',100                               action='store_true')101    def parse(self):102        """Consume the specific options"""103        label_info = topic_common.item_parse_info(attribute_name='labels',104                                                  inline_option='label')105        (options, leftover) = super(host_list, self).parse([label_info])106        self.status = options.status107        self.acl = options.acl108        self.user = options.user109        self.hostnames_only = options.hostnames_only110        if options.locked and options.unlocked:111            self.invalid_syntax('--locked and --unlocked are '112                                'mutually exclusive')113        self.locked = options.locked114        self.unlocked = options.unlocked115        return (options, leftover)116    def execute(self):117        filters = {}118        check_results = {}119        if self.hosts:120            filters['hostname__in'] = self.hosts121            check_results['hostname__in'] = 'hostname'122        if self.labels:123            if len(self.labels) == 1:124                # This is needed for labels with wildcards (x86*)125                filters['labels__name__in'] = self.labels126                check_results['labels__name__in'] = None127            else:128                filters['multiple_labels'] = self.labels129                check_results['multiple_labels'] = None130        if self.status:131            statuses = self.status.split(',')132            statuses = [status.strip() for status in statuses133                        if status.strip()]134            filters['status__in'] = statuses135            check_results['status__in'] = None136        if self.acl:137            filters['aclgroup__name'] = self.acl138            check_results['aclgroup__name'] = None139        if self.user:140            filters['aclgroup__users__login'] = self.user141            check_results['aclgroup__users__login'] = None142        if self.locked or self.unlocked:143            filters['locked'] = self.locked144            check_results['locked'] = None145        return super(host_list, self).execute(op='get_hosts',146                                              filters=filters,147                                              check_results=check_results)148    def output(self, results):149        if results:150            # Remove the platform from the labels.151            for result in results:152                result['labels'] = self._cleanup_labels(result['labels'],153                                                        result['platform'])154        if self.hostnames_only:155            self.print_list(results, key='hostname')156        else:157            super(host_list, self).output(results, keys=['hostname', 'status',158                                                         'locked', 'platform', 'labels'])159class host_stat(host):160    """atest host stat --mlist <file>|<hosts>"""161    usage_action = 'stat'162    def execute(self):163        results = []164        # Convert wildcards into real host stats.165        existing_hosts = []166        for host in self.hosts:167            if host.endswith('*'):168                stats = self.execute_rpc('get_hosts',169                                         hostname__startswith=host.rstrip('*'))170                if len(stats) == 0:171                    self.failure('No hosts matching %s' % host, item=host,172                                 what_failed='Failed to stat')173                    continue174            else:175                stats = self.execute_rpc('get_hosts', hostname=host)176                if len(stats) == 0:177                    self.failure('Unknown host %s' % host, item=host,178                                 what_failed='Failed to stat')179                    continue180            existing_hosts.extend(stats)181        for stat in existing_hosts:182            host = stat['hostname']183            # The host exists, these should succeed184            acls = self.execute_rpc('get_acl_groups', hosts__hostname=host)185            labels = self.execute_rpc('get_labels', host__hostname=host)186            results.append([[stat], acls, labels])187        return results188    def output(self, results):189        for stats, acls, labels in results:190            print '-' * 5191            self.print_fields(stats,192                              keys=['hostname', 'platform',193                                    'status', 'locked', 'locked_by',194                                    'lock_time', 'protection', ])195            self.print_by_ids(acls, 'ACLs', line_before=True)196            labels = self._cleanup_labels(labels)197            self.print_by_ids(labels, 'Labels', line_before=True)198class host_jobs(host):199    """atest host jobs [--max-query] --mlist <file>|<hosts>"""200    usage_action = 'jobs'201    def __init__(self):202        super(host_jobs, self).__init__()203        self.parser.add_option('-q', '--max-query',204                               help='Limits the number of results '205                               '(20 by default)',206                               type='int', default=20)207    def parse(self):208        """Consume the specific options"""209        (options, leftover) = super(host_jobs, self).parse()210        self.max_queries = options.max_query211        return (options, leftover)212    def execute(self):213        results = []214        real_hosts = []215        for host in self.hosts:216            if host.endswith('*'):217                stats = self.execute_rpc('get_hosts',218                                         hostname__startswith=host.rstrip('*'))219                if len(stats) == 0:220                    self.failure('No host matching %s' % host, item=host,221                                 what_failed='Failed to stat')222                [real_hosts.append(stat['hostname']) for stat in stats]223            else:224                real_hosts.append(host)225        for host in real_hosts:226            queue_entries = self.execute_rpc('get_host_queue_entries',227                                             host__hostname=host,228                                             query_limit=self.max_queries,229                                             sort_by=['-job__id'])230            jobs = []231            for entry in queue_entries:232                job = {'job_id': entry['job']['id'],233                       'job_owner': entry['job']['owner'],234                       'job_name': entry['job']['name'],235                       'status': entry['status']}236                jobs.append(job)237            results.append((host, jobs))238        return results239    def output(self, results):240        for host, jobs in results:241            print '-' * 5242            print 'Hostname: %s' % host243            self.print_table(jobs, keys_header=['job_id',244                                                'job_owner',245                                                'job_name',246                                                'status'])247class host_mod(host):248    """atest host mod --lock|--unlock|--protection249    --mlist <file>|<hosts>"""250    usage_action = 'mod'251    def __init__(self):252        """Add the options specific to the mod action"""253        self.data = {}254        self.messages = []255        super(host_mod, self).__init__()256        self.parser.add_option('-l', '--lock',257                               help='Lock hosts',258                               action='store_true')259        self.parser.add_option('-u', '--unlock',260                               help='Unlock hosts',261                               action='store_true')262        self.parser.add_option('-p', '--protection', type='choice',263                               help=('Set the protection level on a host.  '264                                     'Must be one of: %s' %265                                     ', '.join('"%s"' % p266                                               for p in self.protections)),267                               choices=self.protections)268    def parse(self):269        """Consume the specific options"""270        (options, leftover) = super(host_mod, self).parse()271        self._parse_lock_options(options)272        if options.protection:273            self.data['protection'] = options.protection274            self.messages.append('Protection set to "%s"' % options.protection)275        if len(self.data) == 0:276            self.invalid_syntax('No modification requested')277        return (options, leftover)278    def execute(self):279        successes = []280        for host in self.hosts:281            try:282                res = self.execute_rpc('modify_host', item=host,283                                       id=host, **self.data)284                # TODO: Make the AFE return True or False,285                # especially for lock286                successes.append(host)287            except topic_common.CliError, full_error:288                # Already logged by execute_rpc()289                pass290        return successes291    def output(self, hosts):292        for msg in self.messages:293            self.print_wrapped(msg, hosts)294class host_create(host):295    """atest host create [--lock|--unlock --platform <arch>296    --labels <labels>|--blist <label_file>297    --acls <acls>|--alist <acl_file>298    --protection <protection_type>299    --mlist <mach_file>] <hosts>"""300    usage_action = 'create'301    def __init__(self):302        self.messages = []303        super(host_create, self).__init__()304        self.parser.add_option('-l', '--lock',305                               help='Create the hosts as locked',306                               action='store_true', default=False)307        self.parser.add_option('-u', '--unlock',308                               help='Create the hosts as '309                               'unlocked (default)',310                               action='store_true')311        self.parser.add_option('-t', '--platform',312                               help='Sets the platform label')313        self.parser.add_option('-b', '--labels',314                               help='Comma separated list of labels')315        self.parser.add_option('-B', '--blist',316                               help='File listing the labels',317                               type='string',318                               metavar='LABEL_FLIST')319        self.parser.add_option('-a', '--acls',320                               help='Comma separated list of ACLs')321        self.parser.add_option('-A', '--alist',322                               help='File listing the acls',323                               type='string',324                               metavar='ACL_FLIST')325        self.parser.add_option('-p', '--protection', type='choice',326                               help=('Set the protection level on a host.  '327                                     'Must be one of: %s' %328                                     ', '.join('"%s"' % p329                                               for p in self.protections)),330                               choices=self.protections)331    def parse(self):332        label_info = topic_common.item_parse_info(attribute_name='labels',333                                                  inline_option='labels',334                                                  filename_option='blist')335        acl_info = topic_common.item_parse_info(attribute_name='acls',336                                                inline_option='acls',337                                                filename_option='alist')338        (options, leftover) = super(host_create, self).parse([label_info,339                                                              acl_info],340                                                             req_items='hosts')341        self._parse_lock_options(options)342        self.locked = options.lock343        self.platform = getattr(options, 'platform', None)344        if options.protection:345            self.data['protection'] = options.protection346        return (options, leftover)347    def _execute_add_one_host(self, host):348        # Always add the hosts as locked to avoid the host349        # being picked up by the scheduler before it's ACL'ed350        self.data['locked'] = True351        self.execute_rpc('add_host', hostname=host,352                         status="Ready", **self.data)353        # Now add the platform label354        labels = self.labels[:]355        if self.platform:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
