Best Python code snippet using tempest_python
auth.py
Source:auth.py  
...40                               parts.params,41                               parts.query,42                               parts.fragment))43    return url44def apply_url_filters(url, filters):45    if filters.get('api_version', None) is not None:46        url = replace_version(url, filters['api_version'])47    parts = urlparse.urlparse(url)48    if filters.get('skip_path', None) is not None and parts.path != '':49        url = urlparse.urlunparse((parts.scheme,50                                   parts.netloc,51                                   '/',52                                   parts.params,53                                   parts.query,54                                   parts.fragment))55    return url56@six.add_metaclass(abc.ABCMeta)57class AuthProvider(object):58    """Provide authentication"""59    SCOPES = set(['project'])60    def __init__(self, credentials, scope='project'):61        """Auth provider __init__62        :param credentials: credentials for authentication63        :param scope: the default scope to be used by the credential providers64                      when requesting a token. Valid values depend on the65                      AuthProvider class implementation, and are defined in66                      the set SCOPES. Default value is 'project'.67        """68        if self.check_credentials(credentials):69            self.credentials = credentials70        else:71            if isinstance(credentials, Credentials):72                password = credentials.get('password')73                message = "Credentials are: " + str(credentials)74                if password is None:75                    message += " Password is not defined."76                else:77                    message += " Password is defined."78                raise exceptions.InvalidCredentials(message)79            else:80                raise TypeError("credentials object is of type %s, which is"81                                " not a valid Credentials object type." %82                                credentials.__class__.__name__)83        self._scope = None84        self.scope = scope85        self.cache = None86        self.alt_auth_data = None87        self.alt_part = None88    def __str__(self):89        return "Creds :{creds}, cached auth data: {cache}".format(90            creds=self.credentials, cache=self.cache)91    @abc.abstractmethod92    def _decorate_request(self, filters, method, url, headers=None, body=None,93                          auth_data=None):94        """Decorate request with authentication data"""95        return96    @abc.abstractmethod97    def _get_auth(self):98        return99    @abc.abstractmethod100    def _fill_credentials(self, auth_data_body):101        return102    def fill_credentials(self):103        """Fill credentials object with data from auth"""104        auth_data = self.get_auth()105        self._fill_credentials(auth_data[1])106        return self.credentials107    @classmethod108    def check_credentials(cls, credentials):109        """Verify credentials are valid."""110        return isinstance(credentials, Credentials) and credentials.is_valid()111    @property112    def auth_data(self):113        """Auth data for set scope"""114        return self.get_auth()115    @property116    def scope(self):117        """Scope used in auth requests"""118        return self._scope119    @auth_data.deleter120    def auth_data(self):121        self.clear_auth()122    def get_auth(self):123        """Returns auth from cache if available, else auth first"""124        if self.cache is None or self.is_expired(self.cache):125            self.set_auth()126        return self.cache127    def set_auth(self):128        """Forces setting auth.129        Forces setting auth, ignores cache if it exists.130        Refills credentials.131        """132        self.cache = self._get_auth()133        self._fill_credentials(self.cache[1])134    def clear_auth(self):135        """Clear access cache136        Can be called to clear the access cache so that next request137        will fetch a new token and base_url.138        """139        self.cache = None140        self.credentials.reset()141    @abc.abstractmethod142    def is_expired(self, auth_data):143        return144    def auth_request(self, method, url, headers=None, body=None, filters=None):145        """Obtains auth data and decorates a request with that.146        :param method: HTTP method of the request147        :param url: relative URL of the request (path)148        :param headers: HTTP headers of the request149        :param body: HTTP body in case of POST / PUT150        :param filters: select a base URL out of the catalog151        :returns a Tuple (url, headers, body)152        """153        orig_req = dict(url=url, headers=headers, body=body)154        auth_url, auth_headers, auth_body = self._decorate_request(155            filters, method, url, headers, body)156        auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)157        # Overwrite part if the request if it has been requested158        if self.alt_part is not None:159            if self.alt_auth_data is not None:160                alt_url, alt_headers, alt_body = self._decorate_request(161                    filters, method, url, headers, body,162                    auth_data=self.alt_auth_data)163                alt_auth_req = dict(url=alt_url, headers=alt_headers,164                                    body=alt_body)165                if auth_req[self.alt_part] == alt_auth_req[self.alt_part]:166                    raise exceptions.BadAltAuth(part=self.alt_part)167                auth_req[self.alt_part] = alt_auth_req[self.alt_part]168            else:169                # If the requested part is not affected by auth, we are170                # not altering auth as expected, raise an exception171                if auth_req[self.alt_part] == orig_req[self.alt_part]:172                    raise exceptions.BadAltAuth(part=self.alt_part)173                # If alt auth data is None, skip auth in the requested part174                auth_req[self.alt_part] = orig_req[self.alt_part]175            # Next auth request will be normal, unless otherwise requested176            self.reset_alt_auth_data()177        return auth_req['url'], auth_req['headers'], auth_req['body']178    def reset_alt_auth_data(self):179        """Configure auth provider to provide valid authentication data"""180        self.alt_part = None181        self.alt_auth_data = None182    def set_alt_auth_data(self, request_part, auth_data):183        """Alternate auth data on next request184        Configure auth provider to provide alt authentication data185        on a part of the *next* auth_request. If credentials are None,186        set invalid data.187        :param request_part: request part to contain invalid auth: url,188                             headers, body189        :param auth_data: alternative auth_data from which to get the190                          invalid data to be injected191        """192        self.alt_part = request_part193        self.alt_auth_data = auth_data194    @abc.abstractmethod195    def base_url(self, filters, auth_data=None):196        """Extracts the base_url based on provided filters"""197        return198    @scope.setter199    def scope(self, value):200        """Set the scope to be used in token requests201        :param scope: scope to be used. If the scope is different, clear caches202        """203        if value not in self.SCOPES:204            raise exceptions.InvalidScope(205                scope=value, auth_provider=self.__class__.__name__)206        if value != self.scope:207            self.clear_auth()208            self._scope = value209class KeystoneAuthProvider(AuthProvider):210    EXPIRY_DATE_FORMATS = (ISO8601_FLOAT_SECONDS, ISO8601_INT_SECONDS)211    token_expiry_threshold = datetime.timedelta(seconds=60)212    def __init__(self, credentials, auth_url,213                 disable_ssl_certificate_validation=None,214                 ca_certs=None, trace_requests=None, scope='project'):215        super(KeystoneAuthProvider, self).__init__(credentials, scope)216        self.dsvm = disable_ssl_certificate_validation217        self.ca_certs = ca_certs218        self.trace_requests = trace_requests219        self.auth_url = auth_url220        self.auth_client = self._auth_client(auth_url)221    def _decorate_request(self, filters, method, url, headers=None, body=None,222                          auth_data=None):223        if auth_data is None:224            auth_data = self.get_auth()225        token, _ = auth_data226        base_url = self.base_url(filters=filters, auth_data=auth_data)227        # build authenticated request228        # returns new request, it does not touch the original values229        _headers = copy.deepcopy(headers) if headers is not None else {}230        _headers['X-Auth-Token'] = str(token)231        if url is None or url == "":232            _url = base_url233        else:234            # Join base URL and url, and remove multiple contiguous slashes235            _url = "/".join([base_url, url])236            parts = [x for x in urlparse.urlparse(_url)]237            parts[2] = re.sub("/{2,}", "/", parts[2])238            _url = urlparse.urlunparse(parts)239        # no change to method or body240        return str(_url), _headers, body241    @abc.abstractmethod242    def _auth_client(self):243        return244    @abc.abstractmethod245    def _auth_params(self):246        """Auth parameters to be passed to the token request247        By default all fields available in Credentials are passed to the248        token request. Scope may affect this.249        """250        return251    def _get_auth(self):252        # Bypasses the cache253        auth_func = getattr(self.auth_client, 'get_token')254        auth_params = self._auth_params()255        # returns token, auth_data256        token, auth_data = auth_func(**auth_params)257        return token, auth_data258    def _parse_expiry_time(self, expiry_string):259        expiry = None260        for date_format in self.EXPIRY_DATE_FORMATS:261            try:262                expiry = datetime.datetime.strptime(263                    expiry_string, date_format)264            except ValueError:265                pass266        if expiry is None:267            raise ValueError(268                "time data '{data}' does not match any of the"269                "expected formats: {formats}".format(270                    data=expiry_string, formats=self.EXPIRY_DATE_FORMATS))271        return expiry272    def get_token(self):273        return self.get_auth()[0]274class KeystoneV2AuthProvider(KeystoneAuthProvider):275    """Provides authentication based on the Identity V2 API276    The Keystone Identity V2 API defines both unscoped and project scoped277    tokens. This auth provider only implements 'project'.278    """279    SCOPES = set(['project'])280    def _auth_client(self, auth_url):281        return json_v2id.TokenClient(282            auth_url, disable_ssl_certificate_validation=self.dsvm,283            ca_certs=self.ca_certs, trace_requests=self.trace_requests)284    def _auth_params(self):285        """Auth parameters to be passed to the token request286        All fields available in Credentials are passed to the token request.287        """288        return dict(289            user=self.credentials.username,290            password=self.credentials.password,291            tenant=self.credentials.tenant_name,292            auth_data=True)293    def _fill_credentials(self, auth_data_body):294        tenant = auth_data_body['token']['tenant']295        user = auth_data_body['user']296        if self.credentials.tenant_name is None:297            self.credentials.tenant_name = tenant['name']298        if self.credentials.tenant_id is None:299            self.credentials.tenant_id = tenant['id']300        if self.credentials.username is None:301            self.credentials.username = user['name']302        if self.credentials.user_id is None:303            self.credentials.user_id = user['id']304    def base_url(self, filters, auth_data=None):305        """Base URL from catalog306        Filters can be:307        - service: compute, image, etc308        - region: the service region309        - endpoint_type: adminURL, publicURL, internalURL310        - api_version: replace catalog version with this311        - skip_path: take just the base URL312        """313        if auth_data is None:314            auth_data = self.get_auth()315        token, _auth_data = auth_data316        service = filters.get('service')317        region = filters.get('region')318        endpoint_type = filters.get('endpoint_type', 'publicURL')319        if service is None:320            raise exceptions.EndpointNotFound("No service provided")321        _base_url = None322        for ep in _auth_data['serviceCatalog']:323            if ep["type"] == service:324                for _ep in ep['endpoints']:325                    if region is not None and _ep['region'] == region:326                        _base_url = _ep.get(endpoint_type)327                if not _base_url:328                    # No region matching, use the first329                    _base_url = ep['endpoints'][0].get(endpoint_type)330                break331        if _base_url is None:332            raise exceptions.EndpointNotFound(333                "service: %s, region: %s, endpoint_type: %s" %334                (service, region, endpoint_type))335        return apply_url_filters(_base_url, filters)336    def is_expired(self, auth_data):337        _, access = auth_data338        expiry = self._parse_expiry_time(access['token']['expires'])339        return (expiry - self.token_expiry_threshold <=340                datetime.datetime.utcnow())341class KeystoneV3AuthProvider(KeystoneAuthProvider):342    """Provides authentication based on the Identity V3 API"""343    SCOPES = set(['project', 'domain', 'unscoped', None])344    def _auth_client(self, auth_url):345        return json_v3id.V3TokenClient(346            auth_url, disable_ssl_certificate_validation=self.dsvm,347            ca_certs=self.ca_certs, trace_requests=self.trace_requests)348    def _auth_params(self):349        """Auth parameters to be passed to the token request350        Fields available in Credentials are passed to the token request,351        depending on the value of scope. Valid values for scope are: "project",352        "domain". Any other string (e.g. "unscoped") or None will lead to an353        unscoped token request.354        """355        auth_params = dict(356            user_id=self.credentials.user_id,357            username=self.credentials.username,358            user_domain_id=self.credentials.user_domain_id,359            user_domain_name=self.credentials.user_domain_name,360            password=self.credentials.password,361            auth_data=True)362        if self.scope == 'project':363            auth_params.update(364                project_domain_id=self.credentials.project_domain_id,365                project_domain_name=self.credentials.project_domain_name,366                project_id=self.credentials.project_id,367                project_name=self.credentials.project_name)368        if self.scope == 'domain':369            auth_params.update(370                domain_id=self.credentials.domain_id,371                domain_name=self.credentials.domain_name)372        return auth_params373    def _fill_credentials(self, auth_data_body):374        # project or domain, depending on the scope375        project = auth_data_body.get('project', None)376        domain = auth_data_body.get('domain', None)377        # user is always there378        user = auth_data_body['user']379        # Set project fields380        if project is not None:381            if self.credentials.project_name is None:382                self.credentials.project_name = project['name']383            if self.credentials.project_id is None:384                self.credentials.project_id = project['id']385            if self.credentials.project_domain_id is None:386                self.credentials.project_domain_id = project['domain']['id']387            if self.credentials.project_domain_name is None:388                self.credentials.project_domain_name = (389                    project['domain']['name'])390        # Set domain fields391        if domain is not None:392            if self.credentials.domain_id is None:393                self.credentials.domain_id = domain['id']394            if self.credentials.domain_name is None:395                self.credentials.domain_name = domain['name']396        # Set user fields397        if self.credentials.username is None:398            self.credentials.username = user['name']399        if self.credentials.user_id is None:400            self.credentials.user_id = user['id']401        if self.credentials.user_domain_id is None:402            self.credentials.user_domain_id = user['domain']['id']403        if self.credentials.user_domain_name is None:404            self.credentials.user_domain_name = user['domain']['name']405    def base_url(self, filters, auth_data=None):406        """Base URL from catalog407        If scope is not 'project', it may be that there is not catalog in408        the auth_data. In such case, as long as the requested service is409        'identity', we can use the original auth URL to build the base_url.410        Filters can be:411        - service: compute, image, etc412        - region: the service region413        - endpoint_type: adminURL, publicURL, internalURL414        - api_version: replace catalog version with this415        - skip_path: take just the base URL416        """417        if auth_data is None:418            auth_data = self.get_auth()419        token, _auth_data = auth_data420        service = filters.get('service')421        region = filters.get('region')422        endpoint_type = filters.get('endpoint_type', 'public')423        if service is None:424            raise exceptions.EndpointNotFound("No service provided")425        if 'URL' in endpoint_type:426            endpoint_type = endpoint_type.replace('URL', '')427        _base_url = None428        catalog = _auth_data.get('catalog', [])429        # Select entries with matching service type430        service_catalog = [ep for ep in catalog if ep['type'] == service]431        if len(service_catalog) > 0:432            service_catalog = service_catalog[0]['endpoints']433        else:434            if len(catalog) == 0 and service == 'identity':435                # NOTE(andreaf) If there's no catalog at all and the service436                # is identity, it's a valid use case. Having a non-empty437                # catalog with no identity in it is not valid instead.438                return apply_url_filters(self.auth_url, filters)439            else:440                # No matching service441                raise exceptions.EndpointNotFound(service)442        # Filter by endpoint type (interface)443        filtered_catalog = [ep for ep in service_catalog if444                            ep['interface'] == endpoint_type]445        if len(filtered_catalog) == 0:446            # No matching type, keep all and try matching by region at least447            filtered_catalog = service_catalog448        # Filter by region449        filtered_catalog = [ep for ep in filtered_catalog if450                            ep['region'] == region]451        if len(filtered_catalog) == 0:452            # No matching region, take the first endpoint453            filtered_catalog = [service_catalog[0]]454        # There should be only one match. If not take the first.455        _base_url = filtered_catalog[0].get('url', None)456        if _base_url is None:457            raise exceptions.EndpointNotFound(service)458        return apply_url_filters(_base_url, filters)459    def is_expired(self, auth_data):460        _, access = auth_data461        expiry = self._parse_expiry_time(access['expires_at'])462        return (expiry - self.token_expiry_threshold <=463                datetime.datetime.utcnow())464def is_identity_version_supported(identity_version):465    return identity_version in IDENTITY_VERSION466def get_credentials(auth_url, fill_in=True, identity_version='v2',467                    disable_ssl_certificate_validation=None, ca_certs=None,468                    trace_requests=None, **kwargs):469    """Builds a credentials object based on the configured auth_version470    :param auth_url (string): Full URI of the OpenStack Identity API(Keystone)471           which is used to fetch the token from Identity service.472    :param fill_in (boolean): obtain a token and fill in all credential...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
