How to use _get_context method in pytest-play

Best Python code snippet using pytest-play_python

connection.py

Source:connection.py Github

copy

Full Screen

...66 if share_proto == 'CIFS':67 vdm_name = self._get_share_server_name(share_server)68 server_name = vdm_name69 # Check if CIFS server exists.70 status, server = self._get_context('CIFSServer').get(server_name,71 vdm_name)72 if status != constants.STATUS_OK:73 message = (_("CIFS server %s not found.") % server_name)74 LOG.error(message)75 raise exception.EMCVnxXMLAPIError(err=message)76 self._allocate_container(share_name, size, share_server, pool_name)77 if share_proto == 'NFS':78 location = self._create_nfs_share(share_name, share_server)79 elif share_proto == 'CIFS':80 location = self._create_cifs_share(share_name, share_server)81 return location82 def _share_server_validation(self, share_server):83 """Validate the share server."""84 if not share_server:85 msg = _('Share server not provided')86 raise exception.InvalidInput(reason=msg)87 backend_details = share_server.get('backend_details')88 vdm = backend_details.get(89 'share_server_name') if backend_details else None90 if vdm is None:91 message = _("No share server found.")92 LOG.error(message)93 raise exception.EMCVnxXMLAPIError(err=message)94 def _allocate_container(self, share_name, size, share_server, pool_name):95 """Allocate file system for share."""96 vdm_name = self._get_share_server_name(share_server)97 self._get_context('FileSystem').create(98 share_name, size, pool_name, vdm_name)99 def _allocate_container_from_snapshot(self, share, snapshot, share_server,100 pool_name):101 """Allocate file system from snapshot."""102 vdm_name = self._get_share_server_name(share_server)103 interconn_id = self._get_context('Mover').get_interconnect_id(104 self.mover_name, self.mover_name)105 self._get_context('FileSystem').create_from_snapshot(106 share['id'], snapshot['id'], snapshot['share_id'],107 pool_name, vdm_name, interconn_id)108 nwe_size = share['size'] * units.Ki109 self._get_context('FileSystem').extend(share['id'], pool_name,110 nwe_size)111 @vnx_utils.log_enter_exit112 def _create_cifs_share(self, share_name, share_server):113 """Create CIFS share."""114 vdm_name = self._get_share_server_name(share_server)115 server_name = vdm_name116 # Get available CIFS Server and interface (one CIFS server per VDM)117 status, server = self._get_context('CIFSServer').get(server_name,118 vdm_name)119 if 'interfaces' not in server or len(server['interfaces']) == 0:120 message = (_("CIFS server %s doesn't have interface, "121 "so the share is inaccessible.")122 % server['compName'])123 LOG.error(message)124 raise exception.EMCVnxXMLAPIError(err=message)125 interface = server['interfaces'][0]126 self._get_context('CIFSShare').create(share_name, server['name'],127 vdm_name)128 self._get_context('CIFSShare').disable_share_access(share_name,129 vdm_name)130 location = (r'\\%(interface)s\%(name)s' %131 {'interface': interface, 'name': share_name})132 return location133 @vnx_utils.log_enter_exit134 def _create_nfs_share(self, share_name, share_server):135 """Create NFS share."""136 vdm_name = self._get_share_server_name(share_server)137 self._get_context('NFSShare').create(share_name, vdm_name)138 return ('%(nfs_if)s:/%(share_name)s'139 % {'nfs_if': share_server['backend_details']['nfs_if'],140 'share_name': share_name})141 def create_share_from_snapshot(self, context, share, snapshot,142 share_server=None):143 """Create a share from a snapshot - clone a snapshot."""144 share_name = share['id']145 share_proto = share['share_proto']146 # Validate the share protocol147 if share_proto.upper() not in ('NFS', 'CIFS'):148 raise exception.InvalidShare(149 reason=(_('Invalid NAS protocol supplied: %s.')150 % share_proto))151 # Get the pool name from share host field152 pool_name = share_utils.extract_host(share['host'], level='pool')153 if not pool_name:154 message = (_("Pool is not available in the share host %s.") %155 share['host'])156 raise exception.InvalidHost(reason=message)157 self._share_server_validation(share_server)158 self._allocate_container_from_snapshot(159 share, snapshot, share_server, pool_name)160 if share_proto == 'NFS':161 self._create_nfs_share(share_name, share_server)162 location = ('%(nfs_if)s:/%(share_name)s'163 % {'nfs_if': share_server['backend_details']['nfs_if'],164 'share_name': share_name})165 elif share_proto == 'CIFS':166 location = self._create_cifs_share(share_name, share_server)167 return location168 def create_snapshot(self, context, snapshot, share_server=None):169 """Create snapshot from share."""170 share_name = snapshot['share_id']171 status, filesystem = self._get_context('FileSystem').get(share_name)172 if status != constants.STATUS_OK:173 message = (_("File System %s not found.") % share_name)174 LOG.error(message)175 raise exception.EMCVnxXMLAPIError(err=message)176 pool_id = filesystem['pools_id'][0]177 self._get_context('Snapshot').create(snapshot['id'],178 snapshot['share_id'],179 pool_id)180 def delete_share(self, context, share, share_server=None):181 """Delete a share."""182 if share_server is None:183 LOG.warning(_LW("Driver does not support share deletion without "184 "share network specified. Return directly because "185 "there is nothing to clean."))186 return187 share_proto = share['share_proto']188 if share_proto == 'NFS':189 self._delete_nfs_share(share, share_server)190 elif share_proto == 'CIFS':191 self._delete_cifs_share(share, share_server)192 else:193 raise exception.InvalidShare(194 reason='Unsupported share type')195 @vnx_utils.log_enter_exit196 def _delete_cifs_share(self, share, share_server):197 """Delete CIFS share."""198 vdm_name = self._get_share_server_name(share_server)199 name = share['id']200 self._get_context('CIFSShare').delete(name, vdm_name)201 self._deallocate_container(name, vdm_name)202 @vnx_utils.log_enter_exit203 def _delete_nfs_share(self, share, share_server):204 """Delete NFS share."""205 vdm_name = self._get_share_server_name(share_server)206 name = share['id']207 self._get_context('NFSShare').delete(name, vdm_name)208 self._deallocate_container(name, vdm_name)209 @vnx_utils.log_enter_exit210 def _deallocate_container(self, share_name, vdm_name):211 """Delete underneath objects of the share."""212 path = '/' + share_name213 try:214 # Delete mount point215 self._get_context('MountPoint').delete(path, vdm_name)216 except Exception:217 LOG.debug("Skip the failure of mount point %s deletion.", path)218 try:219 # Delete file system220 self._get_context('FileSystem').delete(share_name)221 except Exception:222 LOG.debug("Skip the failure of file system %s deletion.",223 share_name)224 def delete_snapshot(self, context, snapshot, share_server=None):225 """Delete a snapshot."""226 self._get_context('Snapshot').delete(snapshot['id'])227 def ensure_share(self, context, share, share_server=None):228 """Ensure that the share is exported."""229 def extend_share(self, share, new_size, share_server=None):230 # Get the pool name from share host field231 pool_name = share_utils.extract_host(share['host'], level='pool')232 if not pool_name:233 message = (_("Pool is not available in the share host %s.") %234 share['host'])235 raise exception.InvalidHost(reason=message)236 share_name = share['id']237 self._get_context('FileSystem').extend(238 share_name, pool_name, new_size * units.Ki)239 def allow_access(self, context, share, access, share_server=None):240 """Allow access to a share."""241 access_level = access['access_level']242 if access_level not in const.ACCESS_LEVELS:243 raise exception.InvalidShareAccessLevel(level=access_level)244 share_proto = share['share_proto']245 if share_proto == 'NFS':246 self._nfs_allow_access(context, share, access, share_server)247 elif share_proto == 'CIFS':248 self._cifs_allow_access(context, share, access, share_server)249 else:250 raise exception.InvalidShare(251 reason=(_('Invalid NAS protocol supplied: %s.')252 % share_proto))253 @vnx_utils.log_enter_exit254 def _cifs_allow_access(self, context, share, access, share_server):255 """Allow access to CIFS share."""256 vdm_name = self._get_share_server_name(share_server)257 share_name = share['id']258 if access['access_type'] != 'user':259 reason = _('Only user access type allowed for CIFS share')260 raise exception.InvalidShareAccess(reason=reason)261 user_name = access['access_to']262 access_level = access['access_level']263 if access_level == const.ACCESS_LEVEL_RW:264 cifs_access = constants.CIFS_ACL_FULLCONTROL265 else:266 cifs_access = constants.CIFS_ACL_READ267 # Check if CIFS server exists.268 server_name = vdm_name269 status, server = self._get_context('CIFSServer').get(server_name,270 vdm_name)271 if status != constants.STATUS_OK:272 message = (_("CIFS server %s not found.") % server_name)273 LOG.error(message)274 raise exception.EMCVnxXMLAPIError(err=message)275 self._get_context('CIFSShare').allow_share_access(276 vdm_name,277 share_name,278 user_name,279 server['domain'],280 access=cifs_access)281 @vnx_utils.log_enter_exit282 def _nfs_allow_access(self, context, share, access, share_server):283 """Allow access to NFS share."""284 vdm_name = self._get_share_server_name(share_server)285 access_type = access['access_type']286 if access_type != 'ip':287 reason = _('Only ip access type allowed.')288 raise exception.InvalidShareAccess(reason=reason)289 host_ip = access['access_to']290 access_level = access['access_level']291 self._get_context('NFSShare').allow_share_access(292 share['id'], host_ip, vdm_name, access_level)293 def deny_access(self, context, share, access, share_server=None):294 """Deny access to a share."""295 share_proto = share['share_proto']296 if share_proto == 'NFS':297 self._nfs_deny_access(share, access, share_server)298 elif share_proto == 'CIFS':299 self._cifs_deny_access(share, access, share_server)300 else:301 raise exception.InvalidShare(302 reason=_('Unsupported share type'))303 @vnx_utils.log_enter_exit304 def _cifs_deny_access(self, share, access, share_server):305 """Deny access to CIFS share."""306 vdm_name = self._get_share_server_name(share_server)307 share_name = share['id']308 if access['access_type'] != 'user':309 reason = _('Only user access type allowed for CIFS share')310 raise exception.InvalidShareAccess(reason=reason)311 user_name = access['access_to']312 access_level = access['access_level']313 if access_level == const.ACCESS_LEVEL_RW:314 cifs_access = constants.CIFS_ACL_FULLCONTROL315 else:316 cifs_access = constants.CIFS_ACL_READ317 # Check if CIFS server exists.318 server_name = vdm_name319 status, server = self._get_context('CIFSServer').get(server_name,320 vdm_name)321 if status != constants.STATUS_OK:322 message = (_("CIFS server %s not found.") % server_name)323 LOG.error(message)324 raise exception.EMCVnxXMLAPIError(err=message)325 self._get_context('CIFSShare').deny_share_access(326 vdm_name,327 share_name,328 user_name,329 server['domain'],330 access=cifs_access)331 @vnx_utils.log_enter_exit332 def _nfs_deny_access(self, share, access, share_server):333 """Deny access to NFS share."""334 vdm_name = self._get_share_server_name(share_server)335 access_type = access['access_type']336 if access_type != 'ip':337 reason = _('Only ip access type allowed.')338 raise exception.InvalidShareAccess(reason=reason)339 host_ip = access['access_to']340 self._get_context('NFSShare').deny_share_access(share['id'], host_ip,341 vdm_name)342 def check_for_setup_error(self):343 """Check for setup error."""344 # To verify the input from Manila configuration345 status, out = self._get_context('Mover').get_ref(self.mover_name,346 True)347 if constants.STATUS_ERROR == status:348 message = (_("Could not find Data Mover by name: %s.") %349 self.mover_name)350 LOG.error(message)351 raise exception.InvalidParameterValue(err=message)352 self.pools = self._get_managed_storage_pools(self.pool_conf)353 def _get_managed_storage_pools(self, pools):354 matched_pools = set()355 if pools:356 # Get the real pools from the backend storage357 status, backend_pools = self._get_context('StoragePool').get_all()358 if status != constants.STATUS_OK:359 message = (_("Failed to get storage pool information. "360 "Reason: %s") % backend_pools)361 LOG.error(message)362 raise exception.EMCVnxXMLAPIError(err=message)363 real_pools = set([item for item in backend_pools])364 conf_pools = set([item.strip() for item in pools.split(",")])365 for pool in real_pools:366 for matcher in conf_pools:367 if fnmatch.fnmatchcase(pool, matcher):368 matched_pools.add(pool)369 nonexistent_pools = real_pools.difference(matched_pools)370 if not matched_pools:371 msg = (_("All the specified storage pools to be managed "372 "do not exist. Please check your configuration "373 "emc_nas_pool_names in manila.conf. "374 "The available pools in the backend are %s") %375 ",".join(real_pools))376 raise exception.InvalidParameterValue(err=msg)377 if nonexistent_pools:378 LOG.warning(_LW("The following specified storage pools "379 "do not exist: %(unexist)s. "380 "This host will only manage the storage "381 "pools: %(exist)s"),382 {'unexist': ",".join(nonexistent_pools),383 'exist': ",".join(matched_pools)})384 else:385 LOG.debug("Storage pools: %s will be managed.",386 ",".join(matched_pools))387 else:388 LOG.debug("No storage pool is specified, so all pools "389 "in storage system will be managed.")390 return matched_pools391 def connect(self, emc_share_driver, context):392 """Connect to VNX NAS server."""393 self.mover_name = (394 emc_share_driver.configuration.emc_nas_server_container)395 self.pool_conf = emc_share_driver.configuration.safe_get(396 'emc_nas_pool_names')397 self.reserved_percentage = emc_share_driver.configuration.safe_get(398 'reserved_share_percentage')399 if self.reserved_percentage is None:400 self.reserved_percentage = 0401 configuration = emc_share_driver.configuration402 self.manager = manager.StorageObjectManager(configuration)403 def update_share_stats(self, stats_dict):404 """Communicate with EMCNASClient to get the stats."""405 stats_dict['driver_version'] = VERSION406 self._get_context('Mover').get_ref(self.mover_name, True)407 stats_dict['pools'] = []408 status, pools = self._get_context('StoragePool').get_all()409 for name, pool in pools.items():410 if not self.pools or pool['name'] in self.pools:411 total_size = float(pool['total_size'])412 used_size = float(pool['used_size'])413 pool_stat = dict(414 pool_name=pool['name'],415 total_capacity_gb=total_size,416 free_capacity_gb=total_size - used_size,417 qos=False,418 reserved_percentage=self.reserved_percentage,419 )420 stats_dict['pools'].append(pool_stat)421 if not stats_dict['pools']:422 message = _("Failed to update storage pool.")423 LOG.error(message)424 raise exception.EMCVnxXMLAPIError(err=message)425 def get_pool(self, share):426 """Get the pool name of the share."""427 share_name = share['id']428 status, filesystem = self._get_context('FileSystem').get(share_name)429 if status != constants.STATUS_OK:430 message = (_("File System %(name)s not found. "431 "Reason: %(err)s") %432 {'name': share_name, 'err': filesystem})433 LOG.error(message)434 raise exception.EMCVnxXMLAPIError(err=message)435 pool_id = filesystem['pools_id'][0]436 # Get the real pools from the backend storage437 status, backend_pools = self._get_context('StoragePool').get_all()438 if status != constants.STATUS_OK:439 message = (_("Failed to get storage pool information. "440 "Reason: %s") % backend_pools)441 LOG.error(message)442 raise exception.EMCVnxXMLAPIError(err=message)443 for name, pool_info in backend_pools.items():444 if pool_info['id'] == pool_id:445 return name446 available_pools = [item for item in backend_pools]447 message = (_("No matched pool name for share: %(share)s. "448 "Available pools: %(pools)s") %449 {'share': share_name, 'pools': available_pools})450 raise exception.EMCVnxXMLAPIError(err=message)451 def get_network_allocations_number(self):452 """Returns number of network allocations for creating VIFs."""453 return constants.IP_ALLOCATIONS454 def setup_server(self, network_info, metadata=None):455 """Set up and configures share server with given network parameters."""456 # Only support single security service with type 'active_directory'457 vdm_name = network_info['server_id']458 vlan_id = network_info['segmentation_id']459 active_directory = None460 allocated_interfaces = []461 if network_info.get('security_services'):462 is_valid, active_directory = self._get_valid_security_service(463 network_info['security_services'])464 if not is_valid:465 raise exception.EMCVnxXMLAPIError(err=active_directory)466 try:467 if not self._vdm_exist(vdm_name):468 LOG.debug('Share server %s not found, creating '469 'share server...', vdm_name)470 self._get_context('VDM').create(vdm_name, self.mover_name)471 netmask = utils.cidr_to_netmask(network_info['cidr'])472 devices = self._get_physical_devices(self.mover_name)473 for net_info in network_info['network_allocations']:474 random.shuffle(devices)475 interface = {476 'name': net_info['id'][-12:],477 'device_name': devices[0],478 'ip': net_info['ip_address'],479 'mover_name': self.mover_name,480 'net_mask': netmask,481 'vlan_id': vlan_id if vlan_id else -1,482 }483 self._get_context('MoverInterface').create(interface)484 allocated_interfaces.append(interface)485 cifs_interface = allocated_interfaces[0]486 nfs_interface = allocated_interfaces[1]487 if active_directory:488 self._configure_active_directory(489 active_directory, vdm_name, cifs_interface)490 self._get_context('VDM').attach_nfs_interface(491 vdm_name, nfs_interface['name'])492 return {493 'share_server_name': vdm_name,494 'cifs_if': cifs_interface['ip'],495 'nfs_if': nfs_interface['ip'],496 }497 except Exception as ex:498 with excutils.save_and_reraise_exception():499 LOG.error(_LE('Could not setup server. Reason: %s.'), ex)500 server_details = self._construct_backend_details(501 vdm_name, allocated_interfaces)502 self.teardown_server(503 server_details, network_info['security_services'])504 def _construct_backend_details(self, vdm_name, interfaces):505 if_number = len(interfaces)506 cifs_if = interfaces[0]['ip'] if if_number > 0 else None507 nfs_if = interfaces[1]['ip'] if if_number > 1 else None508 return {509 'share_server_name': vdm_name,510 'cifs_if': cifs_if,511 'nfs_if': nfs_if,512 }513 @vnx_utils.log_enter_exit514 def _vdm_exist(self, name):515 status, out = self._get_context('VDM').get(name)516 if constants.STATUS_OK != status:517 return False518 return True519 def _get_physical_devices(self, mover_name):520 """Get a proper network device to create interface."""521 devices = self._get_context('Mover').get_physical_devices(mover_name)522 if not devices:523 message = (_("Could not get physical device port on mover %s.") %524 self.mover_name)525 LOG.error(message)526 raise exception.EMCVnxXMLAPIError(err=message)527 return devices528 def _configure_active_directory(529 self, security_service, vdm_name, interface):530 domain = security_service['domain']531 server = security_service['dns_ip']532 self._get_context('DNSDomain').create(self.mover_name, domain, server)533 cifs_server_args = {534 'name': vdm_name,535 'interface_ip': interface['ip'],536 'domain_name': security_service['domain'],537 'user_name': security_service['user'],538 'password': security_service['password'],539 'mover_name': vdm_name,540 'is_vdm': True,541 }542 self._get_context('CIFSServer').create(cifs_server_args)543 def teardown_server(self, server_details, security_services=None):544 """Teardown share server."""545 if not server_details:546 LOG.debug('Server details are empty.')547 return548 vdm_name = server_details.get('share_server_name')549 if not vdm_name:550 LOG.debug('No share server found in server details.')551 return552 cifs_if = server_details.get('cifs_if')553 nfs_if = server_details.get('nfs_if')554 status, vdm = self._get_context('VDM').get(vdm_name)555 if constants.STATUS_OK != status:556 LOG.debug('Share server %s not found.', vdm_name)557 return558 interfaces = self._get_context('VDM').get_interfaces(vdm_name)559 for if_name in interfaces['nfs']:560 self._get_context('VDM').detach_nfs_interface(vdm_name, if_name)561 if security_services:562 # Only support single security service with type 'active_directory'563 is_valid, active_directory = self._get_valid_security_service(564 security_services)565 if is_valid:566 status, servers = self._get_context('CIFSServer').get_all(567 vdm_name)568 if constants.STATUS_OK != status:569 LOG.error(_LE('Could not find CIFS server by name: %s.'),570 vdm_name)571 else:572 cifs_servers = copy.deepcopy(servers)573 for name, server in cifs_servers.items():574 # Unjoin CIFS Server from domain575 cifs_server_args = {576 'name': server['name'],577 'join_domain': False,578 'user_name': active_directory['user'],579 'password': active_directory['password'],580 'mover_name': vdm_name,581 'is_vdm': True,582 }583 try:584 self._get_context('CIFSServer').modify(585 cifs_server_args)586 except exception.EMCVnxXMLAPIError as expt:587 LOG.debug("Failed to modify CIFS server "588 "%(server)s. Reason: %(err)s.",589 {'server': server, 'err': expt})590 self._get_context('CIFSServer').delete(name, vdm_name)591 # Delete interface from Data Mover592 if cifs_if:593 self._get_context('MoverInterface').delete(cifs_if,594 self.mover_name)595 if nfs_if:596 self._get_context('MoverInterface').delete(nfs_if,597 self.mover_name)598 # Delete Virtual Data Mover599 self._get_context('VDM').delete(vdm_name)600 def _get_valid_security_service(self, security_services):601 """Validate security services and return a supported security service.602 :param security_services:603 :returns: (<is_valid>, <data>) -- <is_valid> is true to indicate604 security_services includes zero or single security service for605 active directory. Otherwise, it would return false. <data> return606 error message when <is_valid> is false. Otherwise, it will607 return zero or single security service for active directory.608 """609 # Only support single security service with type 'active_directory'610 service_number = len(security_services)611 if (service_number > 1 or612 security_services[0]['type'] != 'active_directory'):613 return False, _("Unsupported security services. "614 "Only support single security service and "615 "only support type 'active_directory'")616 return True, security_services[0]617 def _get_share_server_name(self, share_server):618 try:619 return share_server['backend_details']['share_server_name']620 except Exception:621 LOG.debug("Didn't get share server name from share_server %s.",622 share_server)623 return share_server['id']624 def _get_context(self, type):...

Full Screen

Full Screen

test_cluster_context.py

Source:test_cluster_context.py Github

copy

Full Screen

...29class TestClusterContext(b.SaharaTestCase):30 def __init__(self, *args, **kwds):31 super(TestClusterContext, self).__init__(*args, **kwds)32 self.fake_np = np.NodeProcess('fake', 'foo', 'bar')33 def _get_context(self):34 i1 = tu.make_inst_dict('id_1', 'instance_1', MANAGEMENT_IP)35 i1['internal_ip'] = INTERNAL_IP36 master_proc = [37 yarn.RESOURCE_MANAGER.ui_name,38 yarn.NODE_MANAGER.ui_name,39 yarn.HISTORY_SERVER.ui_name,40 maprfs.CLDB.ui_name,41 maprfs.FILE_SERVER.ui_name,42 oozie.OOZIE.ui_name,43 management.ZOOKEEPER.ui_name,44 ]45 master_ng = tu.make_ng_dict('master', 'large', master_proc, 1, [i1])46 cluster_configs = {47 'Service': {48 'key': 'value',49 'Service Version': '1.1',50 },51 'Oozie': {52 'Oozie Version': '4.2.0',53 }54 }55 cluster = tu.create_cluster(56 name='test_cluster',57 tenant='large',58 plugin='mapr',59 version='5.2.0.mrv2',60 node_groups=[master_ng],61 cluster_configs=cluster_configs,62 )63 self.ng = cluster.node_groups[0]64 self.instance = self.ng.instances[0]65 return cc.Context(cluster, handler.VersionHandler())66 def test_get_oozie_server_uri(self):67 ctx = self._get_context()68 expected = 'http://%s:11000/oozie' % MANAGEMENT_IP69 self.assertEqual(expected, ctx.oozie_server_uri)70 def test_oozie_server(self):71 ctx = self._get_context()72 node_processes = ctx.oozie_server.node_group.node_processes73 self.assertIn(oozie.OOZIE.ui_name, node_processes)74 def test_oozie_http(self):75 ctx = self._get_context()76 expected = '%s:11000' % MANAGEMENT_IP77 self.assertEqual(expected, ctx.oozie_http)78 def test_configure_sh(self):79 ctx = self._get_context()80 conf_sh = ctx.configure_sh81 pattern = (r'^(\S+)\s+(-N (\S+))\s+(-C (\S+))\s+(-Z (\S+))\s+'82 r'(-no-autostart)\s+(-f)\s+\s(-HS (\S+))')83 self.assertRegex(conf_sh, pattern)84 self.assertIn('/opt/mapr/server/configure.sh', conf_sh)85 self.assertIn('-C %s' % INTERNAL_IP, conf_sh)86 self.assertIn('-Z %s' % INTERNAL_IP, conf_sh)87 self.assertIn('-HS %s' % INTERNAL_IP, conf_sh)88 self.assertIn('-no-autostart', conf_sh)89 self.assertIn('-N ' + ctx.cluster.name, conf_sh)90 def test_get_cluster_config_value(self):91 ctx = self._get_context()92 conf = p.Config('key', 'Service', 'cluster')93 self.assertEqual('value', ctx._get_cluster_config_value(conf))94 not_set = p.Config('nonset', 'Service', 'cluster')95 self.assertIsNone(ctx._get_cluster_config_value(not_set))96 def test_get_instances(self):97 ctx = self._get_context()98 instances = ctx.get_instances()99 self.assertEqual(1, len(instances))100 rms1 = ctx.get_instances(yarn.RESOURCE_MANAGER)101 self.assertEqual(1, len(rms1))102 rms2 = ctx.get_instances(yarn.RESOURCE_MANAGER.ui_name)103 self.assertEqual(1, len(rms2))104 not_existing_1 = ctx.get_instances(self.fake_np)105 self.assertEqual(0, len(not_existing_1))106 not_existing_2 = ctx.get_instances(self.fake_np.ui_name)107 self.assertEqual(0, len(not_existing_2))108 def test_get_instance(self):109 ctx = self._get_context()110 instance_1 = ctx.get_instance(yarn.RESOURCE_MANAGER)111 self.assertIn(yarn.RESOURCE_MANAGER.ui_name,112 instance_1.node_group.node_processes)113 instance_2 = ctx.get_instance(yarn.RESOURCE_MANAGER)114 self.assertIn(yarn.RESOURCE_MANAGER.ui_name,115 instance_2.node_group.node_processes)116 self.assertIsNone(ctx.get_instance(self.fake_np))117 def test_get_instances_ip(self):118 ctx = self._get_context()119 ip_list_1 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER)120 self.assertEqual(1, len(ip_list_1))121 self.assertIn(INTERNAL_IP, ip_list_1)122 ip_list_2 = ctx.get_instances_ip(yarn.RESOURCE_MANAGER.ui_name)123 self.assertEqual(1, len(ip_list_2))124 self.assertIn(INTERNAL_IP, ip_list_2)125 empty_list = ctx.get_instances_ip(self.fake_np)126 self.assertEqual(0, len(empty_list))127 def test_get_instance_ip(self):128 ctx = self._get_context()129 ip_1 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER)130 self.assertEqual(INTERNAL_IP, ip_1)131 ip_2 = ctx.get_instance_ip(yarn.RESOURCE_MANAGER.ui_name)132 self.assertEqual(INTERNAL_IP, ip_2)133 none_ip = ctx.get_instance_ip(self.fake_np)134 self.assertIsNone(none_ip)135 def test_get_zookeeper_nodes_ip_with_port(self):136 ctx = self._get_context()137 expected = '%s:5181' % INTERNAL_IP138 actual = ctx.get_zookeeper_nodes_ip_with_port()139 self.assertEqual(expected, actual)140 management.ZK_CLIENT_PORT = '0000'141 expected = '%s:0000' % INTERNAL_IP142 actual = ctx.get_zookeeper_nodes_ip_with_port()143 self.assertEqual(expected, actual)144 def test_filter_instances(self):145 ctx = self._get_context()146 instances = ctx.get_instances()147 rsmngs = ctx.filter_instances(instances, yarn.RESOURCE_MANAGER)148 self.assertEqual(1, len(rsmngs))149 not_existing_i = ctx.filter_instances(instances, self.fake_np)150 self.assertEqual(0, len(not_existing_i))151 def test_check_for_process(self):152 ctx = self._get_context()153 instance = ctx.get_instance(yarn.RESOURCE_MANAGER)154 self.assertTrue(ctx.check_for_process(instance, yarn.RESOURCE_MANAGER))155 self.assertTrue(ctx.check_for_process(instance,156 yarn.RESOURCE_MANAGER.ui_name))157 self.assertFalse(ctx.check_for_process(instance, maprfs.NFS))158 self.assertFalse(ctx.check_for_process(instance, maprfs.NFS.ui_name))159 def test_get_chosen_service_version(self):160 ctx = self._get_context()161 version = ctx.get_chosen_service_version('Service')162 self.assertEqual('1.1', version)163 def test_get_cluster_services(self):164 ctx = self._get_context()165 actual_services = ctx.get_cluster_services()166 actual_services_names = map(lambda s: s.ui_name, actual_services)167 expected_services_names = [168 yarn.YARN().ui_name,169 management.Management().ui_name,170 maprfs.MapRFS().ui_name,171 oozie.Oozie().ui_name,172 swift.Swift().ui_name,173 ]174 self.assertEqual(sorted(actual_services_names),175 sorted(expected_services_names))176 def test_get_service(self):177 ctx = self._get_context()178 service = ctx.get_service(yarn.HISTORY_SERVER)179 self.assertEqual(yarn.YARN().ui_name, service.ui_name)180 with testtools.ExpectedException(e.InvalidDataException):181 ctx.get_service(self.fake_np)182 def test_get_service_name_by_node_process(self):183 ctx = self._get_context()184 s_name_1 = ctx.get_service_name_by_node_process(yarn.RESOURCE_MANAGER)185 self.assertEqual(yarn.YARN().ui_name, s_name_1)186 s_name_2 = ctx.get_service_name_by_node_process(187 yarn.RESOURCE_MANAGER.ui_name)188 self.assertEqual(yarn.YARN().ui_name, s_name_2)189 not_existing_np = np.NodeProcess('not_existing', 'NotExisting', 'foo')190 self.assertIsNone(ctx.get_service_name_by_node_process(191 not_existing_np))192 self.assertIsNone(ctx.get_service_name_by_node_process(193 not_existing_np.ui_name))194 def test_get_instances_count(self):195 ctx = self._get_context()196 self.assertEqual(1, ctx.get_instances_count())197 self.assertEqual(1, ctx.get_instances_count(yarn.RESOURCE_MANAGER))198 self.assertEqual(1, ctx.get_instances_count(199 yarn.RESOURCE_MANAGER.ui_name))200 self.assertEqual(0, ctx.get_instances_count(self.fake_np))201 self.assertEqual(0, ctx.get_instances_count(202 self.fake_np.ui_name))203 def test_get_node_groups(self):204 ctx = self._get_context()205 all_ngs = ctx.get_node_groups()206 self.assertEqual(1, len(all_ngs))207 self.assertEqual([self.ng], all_ngs)208 rm_ngs_1 = ctx.get_node_groups(yarn.RESOURCE_MANAGER)209 self.assertEqual(1, len(rm_ngs_1))210 self.assertEqual([self.ng], rm_ngs_1)211 rm_ngs_2 = ctx.get_node_groups(yarn.RESOURCE_MANAGER.ui_name)212 self.assertEqual(1, len(rm_ngs_2))213 self.assertEqual([self.ng], rm_ngs_2)214 empty_ngs = ctx.get_node_groups(self.fake_np)215 self.assertEqual(0, len(empty_ngs))216 def test_get_cldb_nodes_ip(self):217 ctx = self._get_context()218 cldb_list_1 = ctx.get_cldb_nodes_ip()219 self.assertEqual(1, len(cldb_list_1.split(',')))220 self.assertIn(INTERNAL_IP, cldb_list_1)221 cldb_list_2 = ctx.get_cldb_nodes_ip()222 self.assertEqual(1, len(cldb_list_2.split(',')))223 self.assertIn(INTERNAL_IP, cldb_list_2)224 sep = ':'225 cldb_list_3 = ctx.get_cldb_nodes_ip(sep)226 self.assertEqual(1, len(cldb_list_3.split(sep)))227 self.assertIn(INTERNAL_IP, cldb_list_3)228 def test_get_zookeeper_nodes_ip(self):229 ctx = self._get_context()230 zk_list_1 = ctx.get_zookeeper_nodes_ip()231 self.assertEqual(1, len(zk_list_1.split(',')))232 self.assertIn(INTERNAL_IP, zk_list_1)233 zk_list_2 = ctx.get_zookeeper_nodes_ip()234 self.assertEqual(1, len(zk_list_2.split(',')))235 self.assertIn(INTERNAL_IP, zk_list_2)236 sep = ':'237 zk_list_3 = ctx.get_zookeeper_nodes_ip(sep)238 self.assertEqual(1, len(zk_list_3.split(sep)))239 self.assertIn(INTERNAL_IP, zk_list_3)240 def test_get_resourcemanager_ip(self):241 ctx = self._get_context()242 ip = ctx.get_resourcemanager_ip()243 self.assertEqual(INTERNAL_IP, ip)244 def test_get_historyserver_ip(self):245 ctx = self._get_context()246 self.assertTrue(ctx.has_control_nodes([self.instance]))247 def test_is_present(self):248 cluster_context = self._get_context()249 self.assertTrue(cluster_context.is_present(oozie.Oozie()))250 self.assertFalse(cluster_context.is_present(oozie.OozieV401()))...

Full Screen

Full Screen

links.py

Source:links.py Github

copy

Full Screen

...4from django.template.loader import get_template, render_to_string5from sellmo import modules6from sellmo.api.decorators import link7namespace = modules.checkout.namespace8def _get_context(**context):9 site = Site.objects.get_current()10 context.update({11 'settings': modules.settings.get_settings(),12 'request' : {13 'site' : site,14 },15 'url': 'http://{0}'.format(site.domain),16 'prefix': 'http://{0}'.format(site.domain),17 'STATIC_URL': 'http://{0}{1}'.format(site.domain, settings.STATIC_URL),18 'MEDIA_URL': 'http://{0}{1}'.format(site.domain, settings.MEDIA_URL),19 })20 return context21@link()22def render_order_confirmation_email(format, order, data, **kwargs):23 template = None24 if format == 'html':25 template = get_template('checkout/emails/order_confirmation.html')26 elif format == 'text':27 template = get_template('checkout/emails/order_confirmation.txt')28 if template:29 data = template.render(Context(_get_context(order=order)))30 return {31 'data' : data32 }33@link()34def render_order_notification_email(format, order, data, **kwargs):35 template = None36 if format == 'html':37 template = get_template('checkout/emails/order_notification.html')38 elif format == 'text':39 template = get_template('checkout/emails/order_notification.txt')40 if template:41 data = template.render(Context(_get_context(order=order)))42 return {43 'data' : data44 }45@link()46def render_shipping_notification_email(format, order, data, **kwargs):47 template = None48 if format == 'html':49 template = get_template('checkout/emails/shipping_notification.html')50 elif format == 'text':51 template = get_template('checkout/emails/shipping_notification.txt')52 if template:53 data = template.render(Context(_get_context(order=order)))54 return {55 'data' : data56 }57@link()58def render_invoice_report(order, internal, data, **kwargs):59 template = get_template('checkout/reports/invoice.html')60 data = template.render(Context(_get_context(order=order, internal=internal)))61 return {62 'data' : data63 }64@link()65def render_order_confirmation_report(order, internal, data, **kwargs):66 template = get_template('checkout/reports/order_confirmation.html')67 data = template.render(Context(_get_context(order=order, internal=internal)))68 return {69 'data' : data...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-play automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful