Best Python code snippet using pytest-play_python
connection.py
Source:connection.py  
...66        if share_proto == 'CIFS':67            vdm_name = self._get_share_server_name(share_server)68            server_name = vdm_name69            # Check if CIFS server exists.70            status, server = self._get_context('CIFSServer').get(server_name,71                                                                 vdm_name)72            if status != constants.STATUS_OK:73                message = (_("CIFS server %s not found.") % server_name)74                LOG.error(message)75                raise exception.EMCVnxXMLAPIError(err=message)76        self._allocate_container(share_name, size, share_server, pool_name)77        if share_proto == 'NFS':78            location = self._create_nfs_share(share_name, share_server)79        elif share_proto == 'CIFS':80            location = self._create_cifs_share(share_name, share_server)81        return location82    def _share_server_validation(self, share_server):83        """Validate the share server."""84        if not share_server:85            msg = _('Share server not provided')86            raise exception.InvalidInput(reason=msg)87        backend_details = share_server.get('backend_details')88        vdm = backend_details.get(89            'share_server_name') if backend_details else None90        if vdm is None:91            message = _("No share server found.")92            LOG.error(message)93            raise exception.EMCVnxXMLAPIError(err=message)94    def _allocate_container(self, share_name, size, share_server, pool_name):95        """Allocate file system for share."""96        vdm_name = self._get_share_server_name(share_server)97        self._get_context('FileSystem').create(98            share_name, size, pool_name, vdm_name)99    def _allocate_container_from_snapshot(self, share, snapshot, share_server,100                                          pool_name):101        """Allocate file system from snapshot."""102        vdm_name = self._get_share_server_name(share_server)103        interconn_id = self._get_context('Mover').get_interconnect_id(104            self.mover_name, self.mover_name)105        self._get_context('FileSystem').create_from_snapshot(106            share['id'], snapshot['id'], snapshot['share_id'],107            pool_name, vdm_name, interconn_id)108        nwe_size = share['size'] * units.Ki109        self._get_context('FileSystem').extend(share['id'], pool_name,110                                               nwe_size)111    @vnx_utils.log_enter_exit112    def _create_cifs_share(self, share_name, share_server):113        """Create CIFS share."""114        vdm_name = self._get_share_server_name(share_server)115        server_name = vdm_name116        # Get available CIFS Server and interface (one CIFS server per VDM)117        status, server = self._get_context('CIFSServer').get(server_name,118                                                             vdm_name)119        if 'interfaces' not in server or len(server['interfaces']) == 0:120            message = (_("CIFS server %s doesn't have interface, "121                         "so the share is inaccessible.")122                       % server['compName'])123            LOG.error(message)124            raise exception.EMCVnxXMLAPIError(err=message)125        interface = server['interfaces'][0]126        self._get_context('CIFSShare').create(share_name, server['name'],127                                              vdm_name)128        self._get_context('CIFSShare').disable_share_access(share_name,129                                                            vdm_name)130        location = (r'\\%(interface)s\%(name)s' %131                    {'interface': interface, 'name': share_name})132        return location133    @vnx_utils.log_enter_exit134    def _create_nfs_share(self, share_name, share_server):135        """Create NFS share."""136        vdm_name = self._get_share_server_name(share_server)137        self._get_context('NFSShare').create(share_name, vdm_name)138        return ('%(nfs_if)s:/%(share_name)s'139                % {'nfs_if': share_server['backend_details']['nfs_if'],140                   'share_name': share_name})141    def create_share_from_snapshot(self, context, share, snapshot,142                                   share_server=None):143        """Create a share from a snapshot - clone a snapshot."""144        share_name = share['id']145        share_proto = share['share_proto']146        # Validate the share protocol147        if share_proto.upper() not in ('NFS', 'CIFS'):148            raise exception.InvalidShare(149                reason=(_('Invalid NAS protocol supplied: %s.')150                        % share_proto))151        # Get the pool name from share host field152        pool_name = share_utils.extract_host(share['host'], level='pool')153        if not pool_name:154            message = (_("Pool is not available in the share host %s.") %155                       share['host'])156            raise exception.InvalidHost(reason=message)157        self._share_server_validation(share_server)158        self._allocate_container_from_snapshot(159            share, snapshot, share_server, pool_name)160        if share_proto == 'NFS':161            self._create_nfs_share(share_name, share_server)162            location = ('%(nfs_if)s:/%(share_name)s'163                        % {'nfs_if': share_server['backend_details']['nfs_if'],164                           'share_name': share_name})165        elif share_proto == 'CIFS':166            location = self._create_cifs_share(share_name, share_server)167        return location168    def create_snapshot(self, context, snapshot, share_server=None):169        """Create snapshot from share."""170        share_name = snapshot['share_id']171        status, filesystem = self._get_context('FileSystem').get(share_name)172        if status != constants.STATUS_OK:173                message = (_("File System %s not found.") % share_name)174                LOG.error(message)175                raise exception.EMCVnxXMLAPIError(err=message)176        pool_id = filesystem['pools_id'][0]177        self._get_context('Snapshot').create(snapshot['id'],178                                             snapshot['share_id'],179                                             pool_id)180    def delete_share(self, context, share, share_server=None):181        """Delete a share."""182        if share_server is None:183            LOG.warning(_LW("Driver does not support share deletion without "184                            "share network specified. Return directly because "185                            "there is nothing to clean."))186            return187        share_proto = share['share_proto']188        if share_proto == 'NFS':189            self._delete_nfs_share(share, share_server)190        elif share_proto == 'CIFS':191            self._delete_cifs_share(share, share_server)192        else:193            raise exception.InvalidShare(194                reason='Unsupported share type')195    @vnx_utils.log_enter_exit196    def _delete_cifs_share(self, share, share_server):197        """Delete CIFS share."""198        vdm_name = self._get_share_server_name(share_server)199        name = share['id']200        self._get_context('CIFSShare').delete(name, vdm_name)201        self._deallocate_container(name, vdm_name)202    @vnx_utils.log_enter_exit203    def _delete_nfs_share(self, share, share_server):204        """Delete NFS share."""205        vdm_name = self._get_share_server_name(share_server)206        name = share['id']207        self._get_context('NFSShare').delete(name, vdm_name)208        self._deallocate_container(name, vdm_name)209    @vnx_utils.log_enter_exit210    def _deallocate_container(self, share_name, vdm_name):211        """Delete underneath objects of the share."""212        path = '/' + share_name213        try:214            # Delete mount point215            self._get_context('MountPoint').delete(path, vdm_name)216        except Exception:217            LOG.debug("Skip the failure of mount point %s deletion.", path)218        try:219            # Delete file system220            self._get_context('FileSystem').delete(share_name)221        except Exception:222            LOG.debug("Skip the failure of file system %s deletion.",223                      share_name)224    def delete_snapshot(self, context, snapshot, share_server=None):225        """Delete a snapshot."""226        self._get_context('Snapshot').delete(snapshot['id'])227    def ensure_share(self, context, share, share_server=None):228        """Ensure that the share is exported."""229    def extend_share(self, share, new_size, share_server=None):230        # Get the pool name from share host field231        pool_name = share_utils.extract_host(share['host'], level='pool')232        if not pool_name:233            message = (_("Pool is not available in the share host %s.") %234                       share['host'])235            raise exception.InvalidHost(reason=message)236        share_name = share['id']237        self._get_context('FileSystem').extend(238            share_name, pool_name, new_size * units.Ki)239    def allow_access(self, context, share, access, share_server=None):240        """Allow access to a share."""241        access_level = access['access_level']242        if access_level not in const.ACCESS_LEVELS:243            raise exception.InvalidShareAccessLevel(level=access_level)244        share_proto = share['share_proto']245        if share_proto == 'NFS':246            self._nfs_allow_access(context, share, access, share_server)247        elif share_proto == 'CIFS':248            self._cifs_allow_access(context, share, access, share_server)249        else:250            raise exception.InvalidShare(251                reason=(_('Invalid NAS protocol supplied: %s.')252                        % share_proto))253    @vnx_utils.log_enter_exit254    def _cifs_allow_access(self, context, share, access, share_server):255        """Allow access to CIFS share."""256        vdm_name = self._get_share_server_name(share_server)257        share_name = share['id']258        if access['access_type'] != 'user':259            reason = _('Only user access type allowed for CIFS share')260            raise exception.InvalidShareAccess(reason=reason)261        user_name = access['access_to']262        access_level = access['access_level']263        if access_level == const.ACCESS_LEVEL_RW:264            cifs_access = constants.CIFS_ACL_FULLCONTROL265        else:266            cifs_access = constants.CIFS_ACL_READ267        # Check if CIFS server exists.268        server_name = vdm_name269        status, server = self._get_context('CIFSServer').get(server_name,270                                                             vdm_name)271        if status != constants.STATUS_OK:272                message = (_("CIFS server %s not found.") % server_name)273                LOG.error(message)274                raise exception.EMCVnxXMLAPIError(err=message)275        self._get_context('CIFSShare').allow_share_access(276            vdm_name,277            share_name,278            user_name,279            server['domain'],280            access=cifs_access)281    @vnx_utils.log_enter_exit282    def _nfs_allow_access(self, context, share, access, share_server):283        """Allow access to NFS share."""284        vdm_name = self._get_share_server_name(share_server)285        access_type = access['access_type']286        if access_type != 'ip':287            reason = _('Only ip access type allowed.')288            raise exception.InvalidShareAccess(reason=reason)289        host_ip = access['access_to']290        access_level = access['access_level']291        self._get_context('NFSShare').allow_share_access(292            share['id'], host_ip, vdm_name, access_level)293    def deny_access(self, context, share, access, share_server=None):294        """Deny access to a share."""295        share_proto = share['share_proto']296        if share_proto == 'NFS':297            self._nfs_deny_access(share, access, share_server)298        elif share_proto == 'CIFS':299            self._cifs_deny_access(share, access, share_server)300        else:301            raise exception.InvalidShare(302                reason=_('Unsupported share type'))303    @vnx_utils.log_enter_exit304    def _cifs_deny_access(self, share, access, share_server):305        """Deny access to CIFS share."""306        vdm_name = self._get_share_server_name(share_server)307        share_name = share['id']308        if access['access_type'] != 'user':309            reason = _('Only user access type allowed for CIFS share')310            raise exception.InvalidShareAccess(reason=reason)311        user_name = access['access_to']312        access_level = access['access_level']313        if access_level == const.ACCESS_LEVEL_RW:314            cifs_access = constants.CIFS_ACL_FULLCONTROL315        else:316            cifs_access = constants.CIFS_ACL_READ317        # Check if CIFS server exists.318        server_name = vdm_name319        status, server = self._get_context('CIFSServer').get(server_name,320                                                             vdm_name)321        if status != constants.STATUS_OK:322                message = (_("CIFS server %s not found.") % server_name)323                LOG.error(message)324                raise exception.EMCVnxXMLAPIError(err=message)325        self._get_context('CIFSShare').deny_share_access(326            vdm_name,327            share_name,328            user_name,329            server['domain'],330            access=cifs_access)331    @vnx_utils.log_enter_exit332    def _nfs_deny_access(self, share, access, share_server):333        """Deny access to NFS share."""334        vdm_name = self._get_share_server_name(share_server)335        access_type = access['access_type']336        if access_type != 'ip':337            reason = _('Only ip access type allowed.')338            raise exception.InvalidShareAccess(reason=reason)339        host_ip = access['access_to']340        self._get_context('NFSShare').deny_share_access(share['id'], host_ip,341                                                        vdm_name)342    def check_for_setup_error(self):343        """Check for setup error."""344        # To verify the input from Manila configuration345        status, out = self._get_context('Mover').get_ref(self.mover_name,346                                                         True)347        if constants.STATUS_ERROR == status:348            message = (_("Could not find Data Mover by name: %s.") %349                       self.mover_name)350            LOG.error(message)351            raise exception.InvalidParameterValue(err=message)352        self.pools = self._get_managed_storage_pools(self.pool_conf)353    def _get_managed_storage_pools(self, pools):354        matched_pools = set()355        if pools:356            # Get the real pools from the backend storage357            status, backend_pools = self._get_context('StoragePool').get_all()358            if status != constants.STATUS_OK:359                message = (_("Failed to get storage pool information. "360                             "Reason: %s") % backend_pools)361                LOG.error(message)362                raise exception.EMCVnxXMLAPIError(err=message)363            real_pools = set([item for item in backend_pools])364            conf_pools = set([item.strip() for item in pools.split(",")])365            for pool in real_pools:366                for matcher in conf_pools:367                    if fnmatch.fnmatchcase(pool, matcher):368                        matched_pools.add(pool)369            nonexistent_pools = real_pools.difference(matched_pools)370            if not matched_pools:371                msg = (_("All the specified storage pools to be managed "372                         "do not exist. Please check your configuration "373                         "emc_nas_pool_names in manila.conf. "374                         "The available pools in the backend are %s") %375                       ",".join(real_pools))376                raise exception.InvalidParameterValue(err=msg)377            if nonexistent_pools:378                LOG.warning(_LW("The following specified storage pools "379                                "do not exist: %(unexist)s. "380                                "This host will only manage the storage "381                                "pools: %(exist)s"),382                            {'unexist': ",".join(nonexistent_pools),383                             'exist': ",".join(matched_pools)})384            else:385                LOG.debug("Storage pools: %s will be managed.",386                          ",".join(matched_pools))387        else:388            LOG.debug("No storage pool is specified, so all pools "389                      "in storage system will be managed.")390        return matched_pools391    def connect(self, emc_share_driver, context):392        """Connect to VNX NAS server."""393        self.mover_name = (394            emc_share_driver.configuration.emc_nas_server_container)395        self.pool_conf = emc_share_driver.configuration.safe_get(396            'emc_nas_pool_names')397        self.reserved_percentage = emc_share_driver.configuration.safe_get(398            'reserved_share_percentage')399        if self.reserved_percentage is None:400            self.reserved_percentage = 0401        configuration = emc_share_driver.configuration402        self.manager = manager.StorageObjectManager(configuration)403    def update_share_stats(self, stats_dict):404        """Communicate with EMCNASClient to get the stats."""405        stats_dict['driver_version'] = VERSION406        self._get_context('Mover').get_ref(self.mover_name, True)407        stats_dict['pools'] = []408        status, pools = self._get_context('StoragePool').get_all()409        for name, pool in pools.items():410            if not self.pools or pool['name'] in self.pools:411                total_size = float(pool['total_size'])412                used_size = float(pool['used_size'])413                pool_stat = dict(414                    pool_name=pool['name'],415                    total_capacity_gb=total_size,416                    free_capacity_gb=total_size - used_size,417                    qos=False,418                    reserved_percentage=self.reserved_percentage,419                )420                stats_dict['pools'].append(pool_stat)421        if not stats_dict['pools']:422            message = _("Failed to update storage pool.")423            LOG.error(message)424            raise exception.EMCVnxXMLAPIError(err=message)425    def get_pool(self, share):426        """Get the pool name of the share."""427        share_name = share['id']428        status, filesystem = self._get_context('FileSystem').get(share_name)429        if status != constants.STATUS_OK:430            message = (_("File System %(name)s not found. "431                         "Reason: %(err)s") %432                       {'name': share_name, 'err': filesystem})433            LOG.error(message)434            raise exception.EMCVnxXMLAPIError(err=message)435        pool_id = filesystem['pools_id'][0]436        # Get the real pools from the backend storage437        status, backend_pools = self._get_context('StoragePool').get_all()438        if status != constants.STATUS_OK:439            message = (_("Failed to get storage pool information. "440                         "Reason: %s") % backend_pools)441            LOG.error(message)442            raise exception.EMCVnxXMLAPIError(err=message)443        for name, pool_info in backend_pools.items():444            if pool_info['id'] == pool_id:445                return name446        available_pools = [item for item in backend_pools]447        message = (_("No matched pool name for share: %(share)s. "448                     "Available pools: %(pools)s") %449                   {'share': share_name, 'pools': available_pools})450        raise exception.EMCVnxXMLAPIError(err=message)451    def get_network_allocations_number(self):452        """Returns number of network allocations for creating VIFs."""453        return constants.IP_ALLOCATIONS454    def setup_server(self, network_info, metadata=None):455        """Set up and configures share server with given network parameters."""456        # Only support single security service with type 'active_directory'457        vdm_name = network_info['server_id']458        vlan_id = network_info['segmentation_id']459        active_directory = None460        allocated_interfaces = []461        if network_info.get('security_services'):462            is_valid, active_directory = self._get_valid_security_service(463                network_info['security_services'])464            if not is_valid:465                raise exception.EMCVnxXMLAPIError(err=active_directory)466        try:467            if not self._vdm_exist(vdm_name):468                LOG.debug('Share server %s not found, creating '469                          'share server...', vdm_name)470                self._get_context('VDM').create(vdm_name, self.mover_name)471            netmask = utils.cidr_to_netmask(network_info['cidr'])472            devices = self._get_physical_devices(self.mover_name)473            for net_info in network_info['network_allocations']:474                random.shuffle(devices)475                interface = {476                    'name': net_info['id'][-12:],477                    'device_name': devices[0],478                    'ip': net_info['ip_address'],479                    'mover_name': self.mover_name,480                    'net_mask': netmask,481                    'vlan_id': vlan_id if vlan_id else -1,482                }483                self._get_context('MoverInterface').create(interface)484                allocated_interfaces.append(interface)485            cifs_interface = allocated_interfaces[0]486            nfs_interface = allocated_interfaces[1]487            if active_directory:488                self._configure_active_directory(489                    active_directory, vdm_name, cifs_interface)490            self._get_context('VDM').attach_nfs_interface(491                vdm_name, nfs_interface['name'])492            return {493                'share_server_name': vdm_name,494                'cifs_if': cifs_interface['ip'],495                'nfs_if': nfs_interface['ip'],496            }497        except Exception as ex:498            with excutils.save_and_reraise_exception():499                LOG.error(_LE('Could not setup server. Reason: %s.'), ex)500                server_details = self._construct_backend_details(501                    vdm_name, allocated_interfaces)502                self.teardown_server(503                    server_details, network_info['security_services'])504    def _construct_backend_details(self, vdm_name, interfaces):505        if_number = len(interfaces)506        cifs_if = interfaces[0]['ip'] if if_number > 0 else None507        nfs_if = interfaces[1]['ip'] if if_number > 1 else None508        return {509            'share_server_name': vdm_name,510            'cifs_if': cifs_if,511            'nfs_if': nfs_if,512        }513    @vnx_utils.log_enter_exit514    def _vdm_exist(self, name):515        status, out = self._get_context('VDM').get(name)516        if constants.STATUS_OK != status:517            return False518        return True519    def _get_physical_devices(self, mover_name):520        """Get a proper network device to create interface."""521        devices = self._get_context('Mover').get_physical_devices(mover_name)522        if not devices:523            message = (_("Could not get physical device port on mover %s.") %524                       self.mover_name)525            LOG.error(message)526            raise exception.EMCVnxXMLAPIError(err=message)527        return devices528    def _configure_active_directory(529            self, security_service, vdm_name, interface):530        domain = security_service['domain']531        server = security_service['dns_ip']532        self._get_context('DNSDomain').create(self.mover_name, domain, server)533        cifs_server_args = {534            'name': vdm_name,535            'interface_ip': interface['ip'],536            'domain_name': security_service['domain'],537            'user_name': security_service['user'],538            'password': security_service['password'],539            'mover_name': vdm_name,540            'is_vdm': True,541        }542        self._get_context('CIFSServer').create(cifs_server_args)543    def teardown_server(self, server_details, security_services=None):544        """Teardown share server."""545        if not server_details:546            LOG.debug('Server details are empty.')547            return548        vdm_name = server_details.get('share_server_name')549        if not vdm_name:550            LOG.debug('No share server found in server details.')551            return552        cifs_if = server_details.get('cifs_if')553        nfs_if = server_details.get('nfs_if')554        status, vdm = self._get_context('VDM').get(vdm_name)555        if constants.STATUS_OK != status:556            LOG.debug('Share server %s not found.', vdm_name)557            return558        interfaces = self._get_context('VDM').get_interfaces(vdm_name)559        for if_name in interfaces['nfs']:560            self._get_context('VDM').detach_nfs_interface(vdm_name, if_name)561        if security_services:562            # Only support single security service with type 'active_directory'563            is_valid, active_directory = self._get_valid_security_service(564                security_services)565            if is_valid:566                status, servers = self._get_context('CIFSServer').get_all(567                    vdm_name)568                if constants.STATUS_OK != status:569                    LOG.error(_LE('Could not find CIFS server by name: %s.'),570                              vdm_name)571                else:572                    cifs_servers = copy.deepcopy(servers)573                    for name, server in cifs_servers.items():574                        # Unjoin CIFS Server from domain575                        cifs_server_args = {576                            'name': server['name'],577                            'join_domain': False,578                            'user_name': active_directory['user'],579                            'password': active_directory['password'],580                            'mover_name': vdm_name,581                            'is_vdm': True,582                        }583                        try:584                            self._get_context('CIFSServer').modify(585                                cifs_server_args)586                        except exception.EMCVnxXMLAPIError as expt:587                            LOG.debug("Failed to modify CIFS server "588                                      "%(server)s. Reason: %(err)s.",589                                      {'server': server, 'err': expt})590                        self._get_context('CIFSServer').delete(name, vdm_name)591        # Delete interface from Data Mover592        if cifs_if:593            self._get_context('MoverInterface').delete(cifs_if,594                                                       self.mover_name)595        if nfs_if:596            self._get_context('MoverInterface').delete(nfs_if,597                                                       self.mover_name)598        # Delete Virtual Data Mover599        self._get_context('VDM').delete(vdm_name)600    def _get_valid_security_service(self, security_services):601        """Validate security services and return a supported security service.602        :param security_services:603        :returns: (<is_valid>, <data>) -- <is_valid> is true to indicate604            security_services includes zero or single security service for605            active directory. Otherwise, it would return false. <data> return606            error message when <is_valid> is false. Otherwise, it will607            return zero or single security service for active directory.608        """609        # Only support single security service with type 'active_directory'610        service_number = len(security_services)611        if (service_number > 1 or612                security_services[0]['type'] != 'active_directory'):613            return False, _("Unsupported security services. "614                            "Only support single security service and "615                            "only support type 'active_directory'")616        return True, security_services[0]617    def _get_share_server_name(self, share_server):618        try:619            return share_server['backend_details']['share_server_name']620        except Exception:621            LOG.debug("Didn't get share server name from share_server %s.",622                      share_server)623        return share_server['id']624    def _get_context(self, type):...test_cluster_context.py
Source:test_cluster_context.py  
...29class TestClusterContext(b.SaharaTestCase):30    def __init__(self, *args, **kwds):31        super(TestClusterContext, self).__init__(*args, **kwds)32        self.fake_np = np.NodeProcess('fake', 'foo', 'bar')33    def _get_context(self):34        i1 = tu.make_inst_dict('id_1', 'instance_1', MANAGEMENT_IP)35        i1['internal_ip'] = INTERNAL_IP36        master_proc = [37            yarn.RESOURCE_MANAGER.ui_name,38            yarn.NODE_MANAGER.ui_name,39            yarn.HISTORY_SERVER.ui_name,40            maprfs.CLDB.ui_name,41            maprfs.FILE_SERVER.ui_name,42            oozie.OOZIE.ui_name,43            management.ZOOKEEPER.ui_name,44        ]45        master_ng = tu.make_ng_dict('master', 'large', master_proc, 1, [i1])46        cluster_configs = {47            'Service': {48                'key': 'value',49                'Service Version': '1.1',50            },51            'Oozie': {52                'Oozie Version': '4.2.0',53            }54        }55        cluster = tu.create_cluster(56            name='test_cluster',57            tenant='large',58            plugin='mapr',59            version='5.2.0.mrv2',60            node_groups=[master_ng],61            cluster_configs=cluster_configs,62        )63        self.ng = cluster.node_groups[0]64        self.instance = self.ng.instances[0]65        return cc.Context(cluster, handler.VersionHandler())66    def test_get_oozie_server_uri(self):67        ctx = self._get_context()68        expected = 'http://%s:11000/oozie' % MANAGEMENT_IP69        self.assertEqual(expected, ctx.oozie_server_uri)70    def test_oozie_server(self):71        ctx = self._get_context()72        node_processes = ctx.oozie_server.node_group.node_processes73        self.assertIn(oozie.OOZIE.ui_name, node_processes)74    def test_oozie_http(self):75        ctx = self._get_context()76        expected = '%s:11000' % MANAGEMENT_IP77        self.assertEqual(expected, ctx.oozie_http)78    def test_configure_sh(self):79        ctx = self._get_context()80        conf_sh = ctx.configure_sh81        pattern = (r'^(\S+)\s+(-N (\S+))\s+(-C (\S+))\s+(-Z (\S+))\s+'82                   r'(-no-autostart)\s+(-f)\s+\s(-HS (\S+))')83        self.assertRegex(conf_sh, pattern)84        self.assertIn('/opt/mapr/server/configure.sh', conf_sh)85        self.assertIn('-C %s' % INTERNAL_IP, conf_sh)86        self.assertIn('-Z %s' % INTERNAL_IP, conf_sh)87        self.assertIn('-HS %s' % INTERNAL_IP, conf_sh)88        self.assertIn('-no-autostart', conf_sh)89        self.assertIn('-N ' + ctx.cluster.name, conf_sh)90    def test_get_cluster_config_value(self):91        ctx = self._get_context()92        conf = p.Config('key', 'Service', 'cluster')93        self.assertEqual('value', ctx._get_cluster_config_value(conf))94        not_set = p.Config('nonset', 'Service', 'cluster')95        self.assertIsNone(ctx._get_cluster_config_value(not_set))96    def test_get_instances(self):97        ctx = self._get_context()98        instances = ctx.get_instances()99        self.assertEqual(1, len(instances))100        rms1 = ctx.get_instances(yarn.RESOURCE_MANAGER)101        self.assertEqual(1, len(rms1))102        rms2 = ctx.get_instances(yarn.RESOURCE_MANAGER.ui_name)103        self.assertEqual(1, len(rms2))104        not_existing_1 = ctx.get_instances(self.fake_np)105        self.assertEqual(0, len(not_existing_1))106        not_existing_2 = ctx.get_instances(self.fake_np.ui_name)107        self.assertEqual(0, len(not_existing_2))108    def test_get_instance(self):109        ctx = self._get_context()110        instance_1 = ctx.get_instance(yarn.RESOURCE_MANAGER)111        self.assertIn(yarn.RESOURCE_MANAGER.ui_name,112                      instance_1.node_group.node_processes)113        instance_2 = ctx.get_instance(yarn.RESOURCE_MANAGER)114        self.assertIn(yarn.RESOURCE_MANAGER.ui_name,115                      instance_2.node_group.node_processes)116        self.assertIsNone(ctx.get_instance(self.fake_np))117    def test_get_instances_ip(self):118        ctx = self._get_context()119        ip_list_1 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER)120        self.assertEqual(1, len(ip_list_1))121        self.assertIn(INTERNAL_IP, ip_list_1)122        ip_list_2 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER.ui_name)123        self.assertEqual(1, len(ip_list_2))124        self.assertIn(INTERNAL_IP, ip_list_2)125        empty_list = ctx.get_instances_ip(self.fake_np)126        self.assertEqual(0, len(empty_list))127    def test_get_instance_ip(self):128        ctx = self._get_context()129        ip_1 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER)130        self.assertEqual(INTERNAL_IP, ip_1)131        ip_2 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER.ui_name)132        self.assertEqual(INTERNAL_IP, ip_2)133        none_ip = ctx.get_instance_ip(self.fake_np)134        self.assertIsNone(none_ip)135    def test_get_zookeeper_nodes_ip_with_port(self):136        ctx = self._get_context()137        expected = '%s:5181' % INTERNAL_IP138        actual = ctx.get_zookeeper_nodes_ip_with_port()139        self.assertEqual(expected, actual)140        management.ZK_CLIENT_PORT = '0000'141        expected = '%s:0000' % INTERNAL_IP142        actual = ctx.get_zookeeper_nodes_ip_with_port()143        self.assertEqual(expected, actual)144    def test_filter_instances(self):145        ctx = self._get_context()146        instances = ctx.get_instances()147        rsmngs = ctx.filter_instances(instances, yarn.RESOURCE_MANAGER)148        self.assertEqual(1, len(rsmngs))149        not_existing_i = ctx.filter_instances(instances, self.fake_np)150        self.assertEqual(0, len(not_existing_i))151    def test_check_for_process(self):152        ctx = self._get_context()153        instance = ctx.get_instance(yarn.RESOURCE_MANAGER)154        self.assertTrue(ctx.check_for_process(instance, yarn.RESOURCE_MANAGER))155        self.assertTrue(ctx.check_for_process(instance,156                                              yarn.RESOURCE_MANAGER.ui_name))157        self.assertFalse(ctx.check_for_process(instance, maprfs.NFS))158        self.assertFalse(ctx.check_for_process(instance, maprfs.NFS.ui_name))159    def test_get_chosen_service_version(self):160        ctx = self._get_context()161        version = ctx.get_chosen_service_version('Service')162        self.assertEqual('1.1', version)163    def test_get_cluster_services(self):164        ctx = self._get_context()165        actual_services = ctx.get_cluster_services()166        actual_services_names = map(lambda s: s.ui_name, actual_services)167        expected_services_names = [168            yarn.YARN().ui_name,169            management.Management().ui_name,170            maprfs.MapRFS().ui_name,171            oozie.Oozie().ui_name,172            swift.Swift().ui_name,173        ]174        self.assertEqual(sorted(actual_services_names),175                         sorted(expected_services_names))176    def test_get_service(self):177        ctx = self._get_context()178        service = ctx.get_service(yarn.HISTORY_SERVER)179        self.assertEqual(yarn.YARN().ui_name, service.ui_name)180        with testtools.ExpectedException(e.InvalidDataException):181            ctx.get_service(self.fake_np)182    def test_get_service_name_by_node_process(self):183        ctx = self._get_context()184        s_name_1 = ctx.get_service_name_by_node_process(yarn.RESOURCE_MANAGER)185        self.assertEqual(yarn.YARN().ui_name, s_name_1)186        s_name_2 = ctx.get_service_name_by_node_process(187            yarn.RESOURCE_MANAGER.ui_name)188        self.assertEqual(yarn.YARN().ui_name, s_name_2)189        not_existing_np = np.NodeProcess('not_existing', 'NotExisting', 'foo')190        self.assertIsNone(ctx.get_service_name_by_node_process(191            not_existing_np))192        self.assertIsNone(ctx.get_service_name_by_node_process(193            not_existing_np.ui_name))194    def test_get_instances_count(self):195        ctx = self._get_context()196        self.assertEqual(1, ctx.get_instances_count())197        self.assertEqual(1, ctx.get_instances_count(yarn.RESOURCE_MANAGER))198        self.assertEqual(1, ctx.get_instances_count(199            yarn.RESOURCE_MANAGER.ui_name))200        self.assertEqual(0, ctx.get_instances_count(self.fake_np))201        self.assertEqual(0, ctx.get_instances_count(202            self.fake_np.ui_name))203    def test_get_node_groups(self):204        ctx = self._get_context()205        all_ngs = ctx.get_node_groups()206        self.assertEqual(1, len(all_ngs))207        self.assertEqual([self.ng], all_ngs)208        rm_ngs_1 = ctx.get_node_groups(yarn.RESOURCE_MANAGER)209        self.assertEqual(1, len(rm_ngs_1))210        self.assertEqual([self.ng], rm_ngs_1)211        rm_ngs_2 = ctx.get_node_groups(yarn.RESOURCE_MANAGER.ui_name)212        self.assertEqual(1, len(rm_ngs_2))213        self.assertEqual([self.ng], rm_ngs_2)214        empty_ngs = ctx.get_node_groups(self.fake_np)215        self.assertEqual(0, len(empty_ngs))216    def test_get_cldb_nodes_ip(self):217        ctx = self._get_context()218        cldb_list_1 = ctx.get_cldb_nodes_ip()219        self.assertEqual(1, len(cldb_list_1.split(',')))220        self.assertIn(INTERNAL_IP, cldb_list_1)221        cldb_list_2 = ctx.get_cldb_nodes_ip()222        self.assertEqual(1, len(cldb_list_2.split(',')))223        self.assertIn(INTERNAL_IP, cldb_list_2)224        sep = ':'225        cldb_list_3 = ctx.get_cldb_nodes_ip(sep)226        self.assertEqual(1, len(cldb_list_3.split(sep)))227        self.assertIn(INTERNAL_IP, cldb_list_3)228    def test_get_zookeeper_nodes_ip(self):229        ctx = self._get_context()230        zk_list_1 = ctx.get_zookeeper_nodes_ip()231        self.assertEqual(1, len(zk_list_1.split(',')))232        self.assertIn(INTERNAL_IP, zk_list_1)233        zk_list_2 = ctx.get_zookeeper_nodes_ip()234        self.assertEqual(1, len(zk_list_2.split(',')))235        self.assertIn(INTERNAL_IP, zk_list_2)236        sep = ':'237        zk_list_3 = ctx.get_zookeeper_nodes_ip(sep)238        self.assertEqual(1, len(zk_list_3.split(sep)))239        self.assertIn(INTERNAL_IP, zk_list_3)240    def test_get_resourcemanager_ip(self):241        ctx = self._get_context()242        ip = ctx.get_resourcemanager_ip()243        self.assertEqual(INTERNAL_IP, ip)244    def test_get_historyserver_ip(self):245        ctx = self._get_context()246        self.assertTrue(ctx.has_control_nodes([self.instance]))247    def test_is_present(self):248        cluster_context = self._get_context()249        self.assertTrue(cluster_context.is_present(oozie.Oozie()))250        self.assertFalse(cluster_context.is_present(oozie.OozieV401()))...links.py
Source:links.py  
...4from django.template.loader import get_template, render_to_string5from sellmo import modules6from sellmo.api.decorators import link7namespace = modules.checkout.namespace8def _get_context(**context):9    site = Site.objects.get_current()10    context.update({11        'settings': modules.settings.get_settings(),12        'request' : {13            'site' : site,14        },15        'url': 'http://{0}'.format(site.domain),16        'prefix': 'http://{0}'.format(site.domain),17        'STATIC_URL': 'http://{0}{1}'.format(site.domain, settings.STATIC_URL),18        'MEDIA_URL': 'http://{0}{1}'.format(site.domain, settings.MEDIA_URL),19    })20    return context21@link()22def render_order_confirmation_email(format, order, data, **kwargs):23    template = None24    if format == 'html':25        template = get_template('checkout/emails/order_confirmation.html')26    elif format == 'text':27        template = get_template('checkout/emails/order_confirmation.txt')28    if template:29        data = template.render(Context(_get_context(order=order)))30        return {31            'data' : data32        }33@link()34def render_order_notification_email(format, order, data, **kwargs):35    template = None36    if format == 'html':37        template = get_template('checkout/emails/order_notification.html')38    elif format == 'text':39        template = get_template('checkout/emails/order_notification.txt')40    if template:41        data = template.render(Context(_get_context(order=order)))42        return {43            'data' : data44        }45@link()46def render_shipping_notification_email(format, order, data, **kwargs):47    template = None48    if format == 'html':49        template = get_template('checkout/emails/shipping_notification.html')50    elif format == 'text':51        template = get_template('checkout/emails/shipping_notification.txt')52    if template:53        data = template.render(Context(_get_context(order=order)))54        return {55            'data' : data56        }57@link()58def render_invoice_report(order, internal, data, **kwargs):59    template = get_template('checkout/reports/invoice.html')60    data = template.render(Context(_get_context(order=order, internal=internal)))61    return {62        'data' : data63    }64@link()65def render_order_confirmation_report(order, internal, data, **kwargs):66    template = get_template('checkout/reports/order_confirmation.html')67    data = template.render(Context(_get_context(order=order, internal=internal)))68    return {69        'data' : data...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
