Best Python code snippet using localstack_python
auth.py
Source:auth.py  
...221        else:222            return EMPTY_SHA256_HASH223    def canonical_request(self, request):224        cr = [request.method.upper()]225        path = self._normalize_url_path(urlsplit(request.url).path)226        cr.append(path)227        cr.append(self.canonical_query_string(request))228        headers_to_sign = self.headers_to_sign(request)229        cr.append(self.canonical_headers(headers_to_sign) + '\n')230        cr.append(self.signed_headers(headers_to_sign))231        if 'X-Amz-Content-SHA256' in request.headers:232            body_checksum = request.headers['X-Amz-Content-SHA256']233        else:234            body_checksum = self.payload(request)235        cr.append(body_checksum)236        return '\n'.join(cr)237    def _normalize_url_path(self, path):238        normalized_path = quote(normalize_url_path(path), safe='/~')239        return normalized_path240    def scope(self, request):241        scope = [self.credentials.access_key]242        scope.append(request.context['timestamp'][0:8])243        scope.append(self._region_name)244        scope.append(self._service_name)245        scope.append('aws4_request')246        return '/'.join(scope)247    def credential_scope(self, request):248        scope = []249        scope.append(request.context['timestamp'][0:8])250        scope.append(self._region_name)251        scope.append(self._service_name)252        scope.append('aws4_request')253        return '/'.join(scope)254    def string_to_sign(self, request, canonical_request):255        """256        Return the canonical StringToSign as well as a dict257        containing the original version of all headers that258        were included in the StringToSign.259        """260        sts = ['AWS4-HMAC-SHA256']261        sts.append(request.context['timestamp'])262        sts.append(self.credential_scope(request))263        sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())264        return '\n'.join(sts)265    def signature(self, string_to_sign, request):266        key = self.credentials.secret_key267        k_date = self._sign(('AWS4' + key).encode('utf-8'),268                            request.context['timestamp'][0:8])269        k_region = self._sign(k_date, self._region_name)270        k_service = self._sign(k_region, self._service_name)271        k_signing = self._sign(k_service, 'aws4_request')272        return self._sign(k_signing, string_to_sign, hex=True)273    def add_auth(self, request):274        if self.credentials is None:275            raise NoCredentialsError276        datetime_now = datetime.datetime.utcnow()277        request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)278        # This could be a retry.  Make sure the previous279        # authorization header is removed first.280        self._modify_request_before_signing(request)281        canonical_request = self.canonical_request(request)282        logger.debug("Calculating signature using v4 auth.")283        logger.debug('CanonicalRequest:\n%s', canonical_request)284        string_to_sign = self.string_to_sign(request, canonical_request)285        logger.debug('StringToSign:\n%s', string_to_sign)286        signature = self.signature(string_to_sign, request)287        logger.debug('Signature:\n%s', signature)288        self._inject_signature_to_request(request, signature)289    def _inject_signature_to_request(self, request, signature):290        l = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)]291        headers_to_sign = self.headers_to_sign(request)292        l.append('SignedHeaders=%s' % self.signed_headers(headers_to_sign))293        l.append('Signature=%s' % signature)294        request.headers['Authorization'] = ', '.join(l)295        return request296    def _modify_request_before_signing(self, request):297        if 'Authorization' in request.headers:298            del request.headers['Authorization']299        self._set_necessary_date_headers(request)300        if self.credentials.token:301            if 'X-Amz-Security-Token' in request.headers:302                del request.headers['X-Amz-Security-Token']303            request.headers['X-Amz-Security-Token'] = self.credentials.token304    def _set_necessary_date_headers(self, request):305        # The spec allows for either the Date _or_ the X-Amz-Date value to be306        # used so we check both.  If there's a Date header, we use the date307        # header.  Otherwise we use the X-Amz-Date header.308        if 'Date' in request.headers:309            del request.headers['Date']310            datetime_timestamp = datetime.datetime.strptime(311                request.context['timestamp'], SIGV4_TIMESTAMP)312            request.headers['Date'] = formatdate(313                int(calendar.timegm(datetime_timestamp.timetuple())))314            if 'X-Amz-Date' in request.headers:315                del request.headers['X-Amz-Date']316        else:317            if 'X-Amz-Date' in request.headers:318                del request.headers['X-Amz-Date']319            request.headers['X-Amz-Date'] = request.context['timestamp']320class S3SigV4Auth(SigV4Auth):321    def _modify_request_before_signing(self, request):322        super(S3SigV4Auth, self)._modify_request_before_signing(request)323        if 'X-Amz-Content-SHA256' in request.headers:324            del request.headers['X-Amz-Content-SHA256']325        request.headers['X-Amz-Content-SHA256'] = self.payload(request)326    def _normalize_url_path(self, path):327        # For S3, we do not normalize the path.328        return path329class SigV4QueryAuth(SigV4Auth):330    DEFAULT_EXPIRES = 3600331    def __init__(self, credentials, service_name, region_name,332                 expires=DEFAULT_EXPIRES):333        super(SigV4QueryAuth, self).__init__(credentials, service_name,334                                             region_name)335        self._expires = expires336    def _modify_request_before_signing(self, request):337        # Note that we're not including X-Amz-Signature.338        # From the docs: "The Canonical Query String must include all the query339        # parameters from the preceding table except for X-Amz-Signature.340        signed_headers = self.signed_headers(self.headers_to_sign(request))341        auth_params = {342            'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',343            'X-Amz-Credential': self.scope(request),344            'X-Amz-Date': request.context['timestamp'],345            'X-Amz-Expires': self._expires,346            'X-Amz-SignedHeaders': signed_headers,347        }348        if self.credentials.token is not None:349            auth_params['X-Amz-Security-Token'] = self.credentials.token350        # Now parse the original query string to a dict, inject our new query351        # params, and serialize back to a query string.352        url_parts = urlsplit(request.url)353        # parse_qs makes each value a list, but in our case we know we won't354        # have repeated keys so we know we have single element lists which we355        # can convert back to scalar values.356        query_dict = dict(357            [(k, v[0]) for k, v in parse_qs(url_parts.query).items()])358        # The spec is particular about this.  It *has* to be:359        # https://<endpoint>?<operation params>&<auth params>360        # You can't mix the two types of params together, i.e just keep doing361        # new_query_params.update(op_params)362        # new_query_params.update(auth_params)363        # percent_encode_sequence(new_query_params)364        operation_params = ''365        if request.data:366            # We also need to move the body params into the query string.367            # request.data will be populated, for example, with query services368            # which normally form encode the params into the body.369            # This means that request.data is a dict() of the operation params.370            query_dict.update(request.data)371            request.data = ''372        if query_dict:373            operation_params = percent_encode_sequence(query_dict) + '&'374        new_query_string = (operation_params +375                            percent_encode_sequence(auth_params))376        # url_parts is a tuple (and therefore immutable) so we need to create377        # a new url_parts with the new query string.378        # <part>   - <index>379        # scheme   - 0380        # netloc   - 1381        # path     - 2382        # query    - 3  <-- we're replacing this.383        # fragment - 4384        p = url_parts385        new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])386        request.url = urlunsplit(new_url_parts)387    def _inject_signature_to_request(self, request, signature):388        # Rather than calculating an "Authorization" header, for the query389        # param quth, we just append an 'X-Amz-Signature' param to the end390        # of the query string.391        request.url += '&X-Amz-Signature=%s' % signature392class S3SigV4QueryAuth(SigV4QueryAuth):393    """S3 SigV4 auth using query parameters.394    This signer will sign a request using query parameters and signature395    version 4, i.e a "presigned url" signer.396    Based off of:397    http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html398    """399    def _normalize_url_path(self, path):400        # For S3, we do not normalize the path.401        return path402    def payload(self, request):403        # From the doc link above:404        # "You don't include a payload hash in the Canonical Request, because405        # when you create a presigned URL, you don't know anything about the406        # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".407        return "UNSIGNED-PAYLOAD"408class S3SigV4PostAuth(SigV4Auth):409    """410    Presigns a s3 post411    Implementation doc here:412    http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html413    """...base.py
Source:base.py  
...22        Only called for urls that lack a scheme (at the very least), being resolved23        against a base URL that matches this specific fetcher.24        """25        return url26    def _normalize_url_path(self, url: furl) -> furl:  # pylint: disable=no-self-use27        """Normalize the path component of a URL."""28        if not url.path.segments[-1].endswith(TOML_EXTENSION):29            url = url.copy()30            url.path.segments[-1] = f"{url.path.segments[-1]}{TOML_EXTENSION}"31        return url32    def _normalize_scheme(self, scheme: str) -> str:  # pylint: disable=no-self-use33        """Normalize the scheme component of a URL."""34        return scheme35    def normalize(self, url: furl) -> furl:36        """Normalize a URL.37        Produces a canonical URL, meant to be used to uniquely identify a style resource.38        - The base name has .toml appended if not already ending in that extension39        - Individual fetchers can further normalize the path and scheme.40        """41        new_scheme = self._normalize_scheme(url.scheme)42        if new_scheme != url.scheme:43            url = url.copy().set(scheme=new_scheme)44        return self._normalize_url_path(url)45    def fetch(self, url: furl) -> str:46        """Fetch a style from a specific fetcher."""...file.py
Source:file.py  
...20        path = Path(url).expanduser()21        # return absolute paths as URLs as on Windows they could otherwise not resolve22        # cleanly against a file:// base. Relative paths should use POSIX conventions.23        return path.as_uri() if path.is_absolute() else path.as_posix()24    def _normalize_url_path(self, url: furl) -> furl:25        local_path = url_to_python_path(super()._normalize_url_path(url))26        return furl(local_path.resolve().as_uri())27    def fetch(self, url: furl) -> str:28        """Fetch a style from a local file."""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
