Best Python code snippet using molecule_python
inventory.py
Source:inventory.py  
...73        self.ensure_required_groups(ROLES)74        if changed_hosts:75            changed_hosts = self.range2ips(changed_hosts)76            self.hosts = self.build_hostnames(changed_hosts)77            self.purge_invalid_hosts(self.hosts.keys(), PROTECTED_NAMES)78            self.set_all(self.hosts)79            self.set_k8s_cluster()80            etcd_hosts_count = 3 if len(self.hosts.keys()) >= 3 else 181            self.set_etcd(list(self.hosts.keys())[:etcd_hosts_count])82            if len(self.hosts) >= SCALE_THRESHOLD:83                self.set_kube_master(list(self.hosts.keys())[etcd_hosts_count:5])84            else:85                self.set_kube_master(list(self.hosts.keys())[:2])86            self.set_kube_node(self.hosts.keys())87            if len(self.hosts) >= SCALE_THRESHOLD:88                self.set_calico_rr(list(self.hosts.keys())[:etcd_hosts_count])89        else:  # Show help if no options90            self.show_help()91            sys.exit(0)92        self.write_config(self.config_file)93    def write_config(self, config_file):94        if config_file:95            with open(self.config_file, 'w') as f:96                yaml.dump(self.yaml_config, f)97        else:98            print("WARNING: Unable to save config. Make sure you set "99                  "CONFIG_FILE env var.")100    def debug(self, msg):101        if DEBUG:102            print("DEBUG: {0}".format(msg))103    def get_ip_from_opts(self, optstring):104        if 'ip' in optstring:105            return optstring['ip']106        else:107            raise ValueError("IP parameter not found in options")108    def ensure_required_groups(self, groups):109        for group in groups:110            if group == 'all':111                self.debug("Adding group {0}".format(group))112                if group not in self.yaml_config:113                    all_dict = OrderedDict([('hosts', OrderedDict({})),114                                            ('children', OrderedDict({}))])115                    self.yaml_config = {'all': all_dict}116            else:117                self.debug("Adding group {0}".format(group))118                if group not in self.yaml_config['all']['children']:119                    self.yaml_config['all']['children'][group] = {'hosts': {}}120    def get_host_id(self, host):121        '''Returns integer host ID (without padding) from a given hostname.'''122        try:123            short_hostname = host.split('.')[0]124            return int(re.findall("\\d+$", short_hostname)[-1])125        except IndexError:126            raise ValueError("Host name must end in an integer")127    def build_hostnames(self, changed_hosts):128        existing_hosts = OrderedDict()129        highest_host_id = 0130        try:131            for host in self.yaml_config['all']['hosts']:132                existing_hosts[host] = self.yaml_config['all']['hosts'][host]133                host_id = self.get_host_id(host)134                if host_id > highest_host_id:135                    highest_host_id = host_id136        except Exception:137            pass138        # FIXME(mattymo): Fix condition where delete then add reuses highest id139        next_host_id = highest_host_id + 1140        all_hosts = existing_hosts.copy()141        for host in changed_hosts:142            if host[0] == "-":143                realhost = host[1:]144                if self.exists_hostname(all_hosts, realhost):145                    self.debug("Marked {0} for deletion.".format(realhost))146                    all_hosts.pop(realhost)147                elif self.exists_ip(all_hosts, realhost):148                    self.debug("Marked {0} for deletion.".format(realhost))149                    self.delete_host_by_ip(all_hosts, realhost)150            elif host[0].isdigit():151                if ',' in host:152                    ip, access_ip = host.split(',')153                else:154                    ip = host155                    access_ip = host156                if self.exists_hostname(all_hosts, host):157                    self.debug("Skipping existing host {0}.".format(host))158                    continue159                elif self.exists_ip(all_hosts, ip):160                    self.debug("Skipping existing host {0}.".format(ip))161                    continue162                next_host = "{0}{1}".format(HOST_PREFIX, next_host_id)163                next_host_id += 1164                all_hosts[next_host] = {'ansible_host': access_ip,165                                        'ip': ip,166                                        'access_ip': access_ip}167            elif host[0].isalpha():168                raise Exception("Adding hosts by hostname is not supported.")169        return all_hosts170    def range2ips(self, hosts):171        reworked_hosts = []172        def ips(start_address, end_address):173            try:174                # Python 3.x175                start = int(ip_address(start_address))176                end   = int(ip_address(end_address))177            except:178                # Python 2.7179                start = int(ip_address(unicode(start_address)))180                end   = int(ip_address(unicode(end_address)))181            return [ip_address(ip).exploded for ip in range(start, end + 1)]182        for host in hosts:183            if '-' in host and not host.startswith('-'):184                start, end = host.strip().split('-')185                try:186                    reworked_hosts.extend(ips(start, end))187                except ValueError:188                    raise Exception("Range of ip_addresses isn't valid")189            else:190                reworked_hosts.append(host)191        return reworked_hosts192    def exists_hostname(self, existing_hosts, hostname):193        return hostname in existing_hosts.keys()194    def exists_ip(self, existing_hosts, ip):195        for host_opts in existing_hosts.values():196            if ip == self.get_ip_from_opts(host_opts):197                return True198        return False199    def delete_host_by_ip(self, existing_hosts, ip):200        for hostname, host_opts in existing_hosts.items():201            if ip == self.get_ip_from_opts(host_opts):202                del existing_hosts[hostname]203                return204        raise ValueError("Unable to find host by IP: {0}".format(ip))205    def purge_invalid_hosts(self, hostnames, protected_names=[]):206        for role in self.yaml_config['all']['children']:207            if role != 'k8s-cluster' and self.yaml_config['all']['children'][role]['hosts']:  # noqa208                all_hosts = self.yaml_config['all']['children'][role]['hosts'].copy()  # noqa209                for host in all_hosts.keys():210                    if host not in hostnames and host not in protected_names:211                        self.debug(212                            "Host {0} removed from role {1}".format(host, role))  # noqa213                        del self.yaml_config['all']['children'][role]['hosts'][host]  # noqa214        # purge from all215        if self.yaml_config['all']['hosts']:216            all_hosts = self.yaml_config['all']['hosts'].copy()217            for host in all_hosts.keys():218                if host not in hostnames and host not in protected_names:219                    self.debug("Host {0} removed from role all".format(host))...test_inventory.py
Source:test_inventory.py  
...172                       'ip': '10.90.0.3',173                       'access_ip': '10.90.0.3'})])174        self.assertRaisesRegexp(ValueError, "Unable to find host",175                                self.inv.delete_host_by_ip, existing_hosts, ip)176    def test_purge_invalid_hosts(self):177        proper_hostnames = ['node1', 'node2']178        bad_host = 'doesnotbelong2'179        existing_hosts = OrderedDict([180            ('node1', {'ansible_host': '10.90.0.2',181                       'ip': '10.90.0.2',182                       'access_ip': '10.90.0.2'}),183            ('node2', {'ansible_host': '10.90.0.3',184                       'ip': '10.90.0.3',185                       'access_ip': '10.90.0.3'}),186            ('doesnotbelong2', {'whateveropts=ilike'})])187        self.inv.yaml_config['all']['hosts'] = existing_hosts188        self.inv.purge_invalid_hosts(proper_hostnames)189        self.assertTrue(190            bad_host not in self.inv.yaml_config['all']['hosts'].keys())191    def test_add_host_to_group(self):192        group = 'etcd'193        host = 'node1'194        opts = {'ip': '10.90.0.2'}195        self.inv.add_host_to_group(group, host, opts)196        self.assertEqual(197            self.inv.yaml_config['all']['children'][group]['hosts'].get(host),198            None)199    def test_set_kube_master(self):200        group = 'kube-master'201        host = 'node1'202        self.inv.set_kube_master([host])...state.py
Source:state.py  
...176        logger.warning((177            'Use of `State.limit` is deprecated, '178            'please use normal `if` statements instead.'179        ))180        return self.hosts(hosts)181    @contextmanager182    def hosts(self, hosts):183        logger.warning((184            'Use of `State.hosts` is deprecated, '185            'please use normal `if` statements instead.'186        ))187        hosts = ensure_host_list(hosts, inventory=self.inventory)188        # Store the previous value189        old_limit_hosts = self.limit_hosts190        # Limit the new hosts to a subset of the old hosts if they existed191        if old_limit_hosts is not None:192            hosts = [193                host for host in hosts194                if host in old_limit_hosts195            ]196        # Set the new value197        self.limit_hosts = hosts198        logger.debug('Setting limit to hosts: {0}'.format(hosts))199        yield200        # Restore the old value201        self.limit_hosts = old_limit_hosts202        logger.debug('Reset limit to hosts: {0}'.format(old_limit_hosts))203    @contextmanager204    def when(self, predicate):205        logger.warning((206            'Use of `State.when` is deprecated, '207            'please use normal `if` statements instead.'208        ))209        # Truth-y? Just yield/end, nothing happens here!210        if predicate:211            yield212            return213        # Otherwise limit any operations within to match no hosts214        with self.hosts([]):215            yield216    @contextmanager217    def deploy(self, name, kwargs, data, line_number, in_deploy=True):218        '''219        Wraps a group of operations as a deploy, this should not be used220        directly, instead use ``pyinfra.api.deploy.deploy``.221        '''222        # Handle nested deploy names223        if self.deploy_name:224            name = _make_name(self.deploy_name, name)225        # Store the previous values226        old_in_deploy = self.in_deploy227        old_deploy_name = self.deploy_name228        old_deploy_kwargs = self.deploy_kwargs229        old_deploy_data = self.deploy_data230        old_deploy_line_numbers = self.deploy_line_numbers231        self.in_deploy = in_deploy232        # Limit the new hosts to a subset of the old hosts if they existed233        if (234            old_deploy_kwargs235            and old_deploy_kwargs.get('hosts') is not None236        ):237            # If we have hosts - subset them based on the old hosts238            if 'hosts' in kwargs:239                kwargs['hosts'] = [240                    host for host in kwargs['hosts']241                    if host in old_deploy_kwargs['hosts']242                ]243            # Otherwise simply carry the previous hosts244            else:245                kwargs['hosts'] = old_deploy_kwargs['hosts']246        # Make new line numbers - note convert from and back to tuple to avoid247        # keeping deploy_line_numbers mutable.248        new_line_numbers = list(self.deploy_line_numbers or [])249        new_line_numbers.append(line_number)250        new_line_numbers = tuple(new_line_numbers)251        # Set the new values252        self.deploy_name = name253        self.deploy_kwargs = kwargs254        self.deploy_data = data255        self.deploy_line_numbers = new_line_numbers256        logger.debug('Starting deploy {0} (args={1}, data={2})'.format(257            name, kwargs, data,258        ))259        yield260        # Restore the previous values261        self.in_deploy = old_in_deploy262        self.deploy_name = old_deploy_name263        self.deploy_kwargs = old_deploy_kwargs264        self.deploy_data = old_deploy_data265        self.deploy_line_numbers = old_deploy_line_numbers266        logger.debug('Reset deploy to {0} (args={1}, data={2})'.format(267            old_deploy_name, old_deploy_kwargs, old_deploy_data,268        ))269    @contextmanager270    def preserve_loop_order(self, items):271        frameinfo = get_caller_frameinfo(frame_offset=1)  # escape contextlib272        self.loop_line = frameinfo.lineno273        def item_generator():274            for i, item in enumerate(items, 1):275                self.loop_counter = i276                yield item277        yield item_generator278        self.loop_counter = None279        self.loop_line = None280    def get_op_order(self):281        line_numbers_to_hash = self.op_line_numbers_to_hash282        sorted_line_numbers = sorted(list(line_numbers_to_hash.keys()))283        return [284            line_numbers_to_hash[numbers]285            for numbers in sorted_line_numbers286        ]287    def activate_host(self, host):288        '''289        Flag a host as active.290        '''291        logger.debug('Activating host: {0}'.format(host))292        # Add to *both* activated and active - active will reduce as hosts fail293        # but connected will not, enabling us to track failed %.294        self.activated_hosts.add(host)295        self.active_hosts.add(host)296    # def ready_host(self, host):297    #     '''298    #     Flag a host as ready, after which facts will not be gathered for it.299    #     '''300    #     logger.debug('Readying host: {0}'.format(host))301    #     self.ready_hosts.add(host)302    def fail_hosts(self, hosts_to_fail, activated_count=None):303        '''304        Flag a ``set`` of hosts as failed, error for ``config.FAIL_PERCENT``.305        '''306        if not hosts_to_fail:307            return308        activated_count = activated_count or len(self.activated_hosts)309        logger.debug('Failing hosts: {0}'.format(', '.join(310            (host.name for host in hosts_to_fail),311        )))312        self.failed_hosts.update(hosts_to_fail)313        self.active_hosts -= hosts_to_fail314        # Check we're not above the fail percent315        active_hosts = self.active_hosts316        # No hosts left!...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
