How to use apply_url_filters method in tempest

Best Python code snippet using tempest_python

auth.py

Source:auth.py Github

copy

Full Screen

...40 parts.params,41 parts.query,42 parts.fragment))43 return url44def apply_url_filters(url, filters):45 if filters.get('api_version', None) is not None:46 url = replace_version(url, filters['api_version'])47 parts = urlparse.urlparse(url)48 if filters.get('skip_path', None) is not None and parts.path != '':49 url = urlparse.urlunparse((parts.scheme,50 parts.netloc,51 '/',52 parts.params,53 parts.query,54 parts.fragment))55 return url56@six.add_metaclass(abc.ABCMeta)57class AuthProvider(object):58 """Provide authentication"""59 SCOPES = set(['project'])60 def __init__(self, credentials, scope='project'):61 """Auth provider __init__62 :param credentials: credentials for authentication63 :param scope: the default scope to be used by the credential providers64 when requesting a token. Valid values depend on the65 AuthProvider class implementation, and are defined in66 the set SCOPES. Default value is 'project'.67 """68 if self.check_credentials(credentials):69 self.credentials = credentials70 else:71 if isinstance(credentials, Credentials):72 password = credentials.get('password')73 message = "Credentials are: " + str(credentials)74 if password is None:75 message += " Password is not defined."76 else:77 message += " Password is defined."78 raise exceptions.InvalidCredentials(message)79 else:80 raise TypeError("credentials object is of type %s, which is"81 " not a valid Credentials object type." %82 credentials.__class__.__name__)83 self._scope = None84 self.scope = scope85 self.cache = None86 self.alt_auth_data = None87 self.alt_part = None88 def __str__(self):89 return "Creds :{creds}, cached auth data: {cache}".format(90 creds=self.credentials, cache=self.cache)91 @abc.abstractmethod92 def _decorate_request(self, filters, method, url, headers=None, body=None,93 auth_data=None):94 """Decorate request with authentication data"""95 return96 @abc.abstractmethod97 def _get_auth(self):98 return99 @abc.abstractmethod100 def _fill_credentials(self, auth_data_body):101 return102 def fill_credentials(self):103 """Fill credentials object with data from auth"""104 auth_data = self.get_auth()105 self._fill_credentials(auth_data[1])106 return self.credentials107 @classmethod108 def check_credentials(cls, credentials):109 """Verify credentials are valid."""110 return isinstance(credentials, Credentials) and credentials.is_valid()111 @property112 def auth_data(self):113 """Auth data for set scope"""114 return self.get_auth()115 @property116 def scope(self):117 """Scope used in auth requests"""118 return self._scope119 @auth_data.deleter120 def auth_data(self):121 self.clear_auth()122 def get_auth(self):123 """Returns auth from cache if available, else auth first"""124 if self.cache is None or self.is_expired(self.cache):125 self.set_auth()126 return self.cache127 def set_auth(self):128 """Forces setting auth.129 Forces setting auth, ignores cache if it exists.130 Refills credentials.131 """132 self.cache = self._get_auth()133 self._fill_credentials(self.cache[1])134 def clear_auth(self):135 """Clear access cache136 Can be called to clear the access cache so that next request137 will fetch a new token and base_url.138 """139 self.cache = None140 self.credentials.reset()141 @abc.abstractmethod142 def is_expired(self, auth_data):143 return144 def auth_request(self, method, url, headers=None, body=None, filters=None):145 """Obtains auth data and decorates a request with that.146 :param method: HTTP method of the request147 :param url: relative URL of the request (path)148 :param headers: HTTP headers of the request149 :param body: HTTP body in case of POST / PUT150 :param filters: select a base URL out of the catalog151 :returns a Tuple (url, headers, body)152 """153 orig_req = dict(url=url, headers=headers, body=body)154 auth_url, auth_headers, auth_body = self._decorate_request(155 filters, method, url, headers, body)156 auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)157 # Overwrite part if the request if it has been requested158 if self.alt_part is not None:159 if self.alt_auth_data is not None:160 alt_url, alt_headers, alt_body = self._decorate_request(161 filters, method, url, headers, body,162 auth_data=self.alt_auth_data)163 alt_auth_req = dict(url=alt_url, headers=alt_headers,164 body=alt_body)165 if auth_req[self.alt_part] == alt_auth_req[self.alt_part]:166 raise exceptions.BadAltAuth(part=self.alt_part)167 auth_req[self.alt_part] = alt_auth_req[self.alt_part]168 else:169 # If the requested part is not affected by auth, we are170 # not altering auth as expected, raise an exception171 if auth_req[self.alt_part] == orig_req[self.alt_part]:172 raise exceptions.BadAltAuth(part=self.alt_part)173 # If alt auth data is None, skip auth in the requested part174 auth_req[self.alt_part] = orig_req[self.alt_part]175 # Next auth request will be normal, unless otherwise requested176 self.reset_alt_auth_data()177 return auth_req['url'], auth_req['headers'], auth_req['body']178 def reset_alt_auth_data(self):179 """Configure auth provider to provide valid authentication data"""180 self.alt_part = None181 self.alt_auth_data = None182 def set_alt_auth_data(self, request_part, auth_data):183 """Alternate auth data on next request184 Configure auth provider to provide alt authentication data185 on a part of the *next* auth_request. If credentials are None,186 set invalid data.187 :param request_part: request part to contain invalid auth: url,188 headers, body189 :param auth_data: alternative auth_data from which to get the190 invalid data to be injected191 """192 self.alt_part = request_part193 self.alt_auth_data = auth_data194 @abc.abstractmethod195 def base_url(self, filters, auth_data=None):196 """Extracts the base_url based on provided filters"""197 return198 @scope.setter199 def scope(self, value):200 """Set the scope to be used in token requests201 :param scope: scope to be used. If the scope is different, clear caches202 """203 if value not in self.SCOPES:204 raise exceptions.InvalidScope(205 scope=value, auth_provider=self.__class__.__name__)206 if value != self.scope:207 self.clear_auth()208 self._scope = value209class KeystoneAuthProvider(AuthProvider):210 EXPIRY_DATE_FORMATS = (ISO8601_FLOAT_SECONDS, ISO8601_INT_SECONDS)211 token_expiry_threshold = datetime.timedelta(seconds=60)212 def __init__(self, credentials, auth_url,213 disable_ssl_certificate_validation=None,214 ca_certs=None, trace_requests=None, scope='project'):215 super(KeystoneAuthProvider, self).__init__(credentials, scope)216 self.dsvm = disable_ssl_certificate_validation217 self.ca_certs = ca_certs218 self.trace_requests = trace_requests219 self.auth_url = auth_url220 self.auth_client = self._auth_client(auth_url)221 def _decorate_request(self, filters, method, url, headers=None, body=None,222 auth_data=None):223 if auth_data is None:224 auth_data = self.get_auth()225 token, _ = auth_data226 base_url = self.base_url(filters=filters, auth_data=auth_data)227 # build authenticated request228 # returns new request, it does not touch the original values229 _headers = copy.deepcopy(headers) if headers is not None else {}230 _headers['X-Auth-Token'] = str(token)231 if url is None or url == "":232 _url = base_url233 else:234 # Join base URL and url, and remove multiple contiguous slashes235 _url = "/".join([base_url, url])236 parts = [x for x in urlparse.urlparse(_url)]237 parts[2] = re.sub("/{2,}", "/", parts[2])238 _url = urlparse.urlunparse(parts)239 # no change to method or body240 return str(_url), _headers, body241 @abc.abstractmethod242 def _auth_client(self):243 return244 @abc.abstractmethod245 def _auth_params(self):246 """Auth parameters to be passed to the token request247 By default all fields available in Credentials are passed to the248 token request. Scope may affect this.249 """250 return251 def _get_auth(self):252 # Bypasses the cache253 auth_func = getattr(self.auth_client, 'get_token')254 auth_params = self._auth_params()255 # returns token, auth_data256 token, auth_data = auth_func(**auth_params)257 return token, auth_data258 def _parse_expiry_time(self, expiry_string):259 expiry = None260 for date_format in self.EXPIRY_DATE_FORMATS:261 try:262 expiry = datetime.datetime.strptime(263 expiry_string, date_format)264 except ValueError:265 pass266 if expiry is None:267 raise ValueError(268 "time data '{data}' does not match any of the"269 "expected formats: {formats}".format(270 data=expiry_string, formats=self.EXPIRY_DATE_FORMATS))271 return expiry272 def get_token(self):273 return self.get_auth()[0]274class KeystoneV2AuthProvider(KeystoneAuthProvider):275 """Provides authentication based on the Identity V2 API276 The Keystone Identity V2 API defines both unscoped and project scoped277 tokens. This auth provider only implements 'project'.278 """279 SCOPES = set(['project'])280 def _auth_client(self, auth_url):281 return json_v2id.TokenClient(282 auth_url, disable_ssl_certificate_validation=self.dsvm,283 ca_certs=self.ca_certs, trace_requests=self.trace_requests)284 def _auth_params(self):285 """Auth parameters to be passed to the token request286 All fields available in Credentials are passed to the token request.287 """288 return dict(289 user=self.credentials.username,290 password=self.credentials.password,291 tenant=self.credentials.tenant_name,292 auth_data=True)293 def _fill_credentials(self, auth_data_body):294 tenant = auth_data_body['token']['tenant']295 user = auth_data_body['user']296 if self.credentials.tenant_name is None:297 self.credentials.tenant_name = tenant['name']298 if self.credentials.tenant_id is None:299 self.credentials.tenant_id = tenant['id']300 if self.credentials.username is None:301 self.credentials.username = user['name']302 if self.credentials.user_id is None:303 self.credentials.user_id = user['id']304 def base_url(self, filters, auth_data=None):305 """Base URL from catalog306 Filters can be:307 - service: compute, image, etc308 - region: the service region309 - endpoint_type: adminURL, publicURL, internalURL310 - api_version: replace catalog version with this311 - skip_path: take just the base URL312 """313 if auth_data is None:314 auth_data = self.get_auth()315 token, _auth_data = auth_data316 service = filters.get('service')317 region = filters.get('region')318 endpoint_type = filters.get('endpoint_type', 'publicURL')319 if service is None:320 raise exceptions.EndpointNotFound("No service provided")321 _base_url = None322 for ep in _auth_data['serviceCatalog']:323 if ep["type"] == service:324 for _ep in ep['endpoints']:325 if region is not None and _ep['region'] == region:326 _base_url = _ep.get(endpoint_type)327 if not _base_url:328 # No region matching, use the first329 _base_url = ep['endpoints'][0].get(endpoint_type)330 break331 if _base_url is None:332 raise exceptions.EndpointNotFound(333 "service: %s, region: %s, endpoint_type: %s" %334 (service, region, endpoint_type))335 return apply_url_filters(_base_url, filters)336 def is_expired(self, auth_data):337 _, access = auth_data338 expiry = self._parse_expiry_time(access['token']['expires'])339 return (expiry - self.token_expiry_threshold <=340 datetime.datetime.utcnow())341class KeystoneV3AuthProvider(KeystoneAuthProvider):342 """Provides authentication based on the Identity V3 API"""343 SCOPES = set(['project', 'domain', 'unscoped', None])344 def _auth_client(self, auth_url):345 return json_v3id.V3TokenClient(346 auth_url, disable_ssl_certificate_validation=self.dsvm,347 ca_certs=self.ca_certs, trace_requests=self.trace_requests)348 def _auth_params(self):349 """Auth parameters to be passed to the token request350 Fields available in Credentials are passed to the token request,351 depending on the value of scope. Valid values for scope are: "project",352 "domain". Any other string (e.g. "unscoped") or None will lead to an353 unscoped token request.354 """355 auth_params = dict(356 user_id=self.credentials.user_id,357 username=self.credentials.username,358 user_domain_id=self.credentials.user_domain_id,359 user_domain_name=self.credentials.user_domain_name,360 password=self.credentials.password,361 auth_data=True)362 if self.scope == 'project':363 auth_params.update(364 project_domain_id=self.credentials.project_domain_id,365 project_domain_name=self.credentials.project_domain_name,366 project_id=self.credentials.project_id,367 project_name=self.credentials.project_name)368 if self.scope == 'domain':369 auth_params.update(370 domain_id=self.credentials.domain_id,371 domain_name=self.credentials.domain_name)372 return auth_params373 def _fill_credentials(self, auth_data_body):374 # project or domain, depending on the scope375 project = auth_data_body.get('project', None)376 domain = auth_data_body.get('domain', None)377 # user is always there378 user = auth_data_body['user']379 # Set project fields380 if project is not None:381 if self.credentials.project_name is None:382 self.credentials.project_name = project['name']383 if self.credentials.project_id is None:384 self.credentials.project_id = project['id']385 if self.credentials.project_domain_id is None:386 self.credentials.project_domain_id = project['domain']['id']387 if self.credentials.project_domain_name is None:388 self.credentials.project_domain_name = (389 project['domain']['name'])390 # Set domain fields391 if domain is not None:392 if self.credentials.domain_id is None:393 self.credentials.domain_id = domain['id']394 if self.credentials.domain_name is None:395 self.credentials.domain_name = domain['name']396 # Set user fields397 if self.credentials.username is None:398 self.credentials.username = user['name']399 if self.credentials.user_id is None:400 self.credentials.user_id = user['id']401 if self.credentials.user_domain_id is None:402 self.credentials.user_domain_id = user['domain']['id']403 if self.credentials.user_domain_name is None:404 self.credentials.user_domain_name = user['domain']['name']405 def base_url(self, filters, auth_data=None):406 """Base URL from catalog407 If scope is not 'project', it may be that there is not catalog in408 the auth_data. In such case, as long as the requested service is409 'identity', we can use the original auth URL to build the base_url.410 Filters can be:411 - service: compute, image, etc412 - region: the service region413 - endpoint_type: adminURL, publicURL, internalURL414 - api_version: replace catalog version with this415 - skip_path: take just the base URL416 """417 if auth_data is None:418 auth_data = self.get_auth()419 token, _auth_data = auth_data420 service = filters.get('service')421 region = filters.get('region')422 endpoint_type = filters.get('endpoint_type', 'public')423 if service is None:424 raise exceptions.EndpointNotFound("No service provided")425 if 'URL' in endpoint_type:426 endpoint_type = endpoint_type.replace('URL', '')427 _base_url = None428 catalog = _auth_data.get('catalog', [])429 # Select entries with matching service type430 service_catalog = [ep for ep in catalog if ep['type'] == service]431 if len(service_catalog) > 0:432 service_catalog = service_catalog[0]['endpoints']433 else:434 if len(catalog) == 0 and service == 'identity':435 # NOTE(andreaf) If there's no catalog at all and the service436 # is identity, it's a valid use case. Having a non-empty437 # catalog with no identity in it is not valid instead.438 return apply_url_filters(self.auth_url, filters)439 else:440 # No matching service441 raise exceptions.EndpointNotFound(service)442 # Filter by endpoint type (interface)443 filtered_catalog = [ep for ep in service_catalog if444 ep['interface'] == endpoint_type]445 if len(filtered_catalog) == 0:446 # No matching type, keep all and try matching by region at least447 filtered_catalog = service_catalog448 # Filter by region449 filtered_catalog = [ep for ep in filtered_catalog if450 ep['region'] == region]451 if len(filtered_catalog) == 0:452 # No matching region, take the first endpoint453 filtered_catalog = [service_catalog[0]]454 # There should be only one match. If not take the first.455 _base_url = filtered_catalog[0].get('url', None)456 if _base_url is None:457 raise exceptions.EndpointNotFound(service)458 return apply_url_filters(_base_url, filters)459 def is_expired(self, auth_data):460 _, access = auth_data461 expiry = self._parse_expiry_time(access['expires_at'])462 return (expiry - self.token_expiry_threshold <=463 datetime.datetime.utcnow())464def is_identity_version_supported(identity_version):465 return identity_version in IDENTITY_VERSION466def get_credentials(auth_url, fill_in=True, identity_version='v2',467 disable_ssl_certificate_validation=None, ca_certs=None,468 trace_requests=None, **kwargs):469 """Builds a credentials object based on the configured auth_version470 :param auth_url (string): Full URI of the OpenStack Identity API(Keystone)471 which is used to fetch the token from Identity service.472 :param fill_in (boolean): obtain a token and fill in all credential...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful