How to use _normalize_url_path method in localstack

Best Python code snippet using localstack_python

auth.py

Source:auth.py Github

copy

Full Screen

...221 else:222 return EMPTY_SHA256_HASH223 def canonical_request(self, request):224 cr = [request.method.upper()]225 path = self._normalize_url_path(urlsplit(request.url).path)226 cr.append(path)227 cr.append(self.canonical_query_string(request))228 headers_to_sign = self.headers_to_sign(request)229 cr.append(self.canonical_headers(headers_to_sign) + '\n')230 cr.append(self.signed_headers(headers_to_sign))231 if 'X-Amz-Content-SHA256' in request.headers:232 body_checksum = request.headers['X-Amz-Content-SHA256']233 else:234 body_checksum = self.payload(request)235 cr.append(body_checksum)236 return '\n'.join(cr)237 def _normalize_url_path(self, path):238 normalized_path = quote(normalize_url_path(path), safe='/~')239 return normalized_path240 def scope(self, request):241 scope = [self.credentials.access_key]242 scope.append(request.context['timestamp'][0:8])243 scope.append(self._region_name)244 scope.append(self._service_name)245 scope.append('aws4_request')246 return '/'.join(scope)247 def credential_scope(self, request):248 scope = []249 scope.append(request.context['timestamp'][0:8])250 scope.append(self._region_name)251 scope.append(self._service_name)252 scope.append('aws4_request')253 return '/'.join(scope)254 def string_to_sign(self, request, canonical_request):255 """256 Return the canonical StringToSign as well as a dict257 containing the original version of all headers that258 were included in the StringToSign.259 """260 sts = ['AWS4-HMAC-SHA256']261 sts.append(request.context['timestamp'])262 sts.append(self.credential_scope(request))263 sts.append(sha256(canonical_request.encode('utf-8')).hexdigest())264 return '\n'.join(sts)265 def signature(self, string_to_sign, request):266 key = self.credentials.secret_key267 k_date = self._sign(('AWS4' + key).encode('utf-8'),268 request.context['timestamp'][0:8])269 k_region = self._sign(k_date, self._region_name)270 k_service = self._sign(k_region, self._service_name)271 k_signing = self._sign(k_service, 'aws4_request')272 return self._sign(k_signing, string_to_sign, hex=True)273 def add_auth(self, request):274 if self.credentials is None:275 raise NoCredentialsError276 datetime_now = datetime.datetime.utcnow()277 request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP)278 # This could be a retry. Make sure the previous279 # authorization header is removed first.280 self._modify_request_before_signing(request)281 canonical_request = self.canonical_request(request)282 logger.debug("Calculating signature using v4 auth.")283 logger.debug('CanonicalRequest:\n%s', canonical_request)284 string_to_sign = self.string_to_sign(request, canonical_request)285 logger.debug('StringToSign:\n%s', string_to_sign)286 signature = self.signature(string_to_sign, request)287 logger.debug('Signature:\n%s', signature)288 self._inject_signature_to_request(request, signature)289 def _inject_signature_to_request(self, request, signature):290 l = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)]291 headers_to_sign = self.headers_to_sign(request)292 l.append('SignedHeaders=%s' % self.signed_headers(headers_to_sign))293 l.append('Signature=%s' % signature)294 request.headers['Authorization'] = ', '.join(l)295 return request296 def _modify_request_before_signing(self, request):297 if 'Authorization' in request.headers:298 del request.headers['Authorization']299 self._set_necessary_date_headers(request)300 if self.credentials.token:301 if 'X-Amz-Security-Token' in request.headers:302 del request.headers['X-Amz-Security-Token']303 request.headers['X-Amz-Security-Token'] = self.credentials.token304 def _set_necessary_date_headers(self, request):305 # The spec allows for either the Date _or_ the X-Amz-Date value to be306 # used so we check both. If there's a Date header, we use the date307 # header. Otherwise we use the X-Amz-Date header.308 if 'Date' in request.headers:309 del request.headers['Date']310 datetime_timestamp = datetime.datetime.strptime(311 request.context['timestamp'], SIGV4_TIMESTAMP)312 request.headers['Date'] = formatdate(313 int(calendar.timegm(datetime_timestamp.timetuple())))314 if 'X-Amz-Date' in request.headers:315 del request.headers['X-Amz-Date']316 else:317 if 'X-Amz-Date' in request.headers:318 del request.headers['X-Amz-Date']319 request.headers['X-Amz-Date'] = request.context['timestamp']320class S3SigV4Auth(SigV4Auth):321 def _modify_request_before_signing(self, request):322 super(S3SigV4Auth, self)._modify_request_before_signing(request)323 if 'X-Amz-Content-SHA256' in request.headers:324 del request.headers['X-Amz-Content-SHA256']325 request.headers['X-Amz-Content-SHA256'] = self.payload(request)326 def _normalize_url_path(self, path):327 # For S3, we do not normalize the path.328 return path329class SigV4QueryAuth(SigV4Auth):330 DEFAULT_EXPIRES = 3600331 def __init__(self, credentials, service_name, region_name,332 expires=DEFAULT_EXPIRES):333 super(SigV4QueryAuth, self).__init__(credentials, service_name,334 region_name)335 self._expires = expires336 def _modify_request_before_signing(self, request):337 # Note that we're not including X-Amz-Signature.338 # From the docs: "The Canonical Query String must include all the query339 # parameters from the preceding table except for X-Amz-Signature.340 signed_headers = self.signed_headers(self.headers_to_sign(request))341 auth_params = {342 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',343 'X-Amz-Credential': self.scope(request),344 'X-Amz-Date': request.context['timestamp'],345 'X-Amz-Expires': self._expires,346 'X-Amz-SignedHeaders': signed_headers,347 }348 if self.credentials.token is not None:349 auth_params['X-Amz-Security-Token'] = self.credentials.token350 # Now parse the original query string to a dict, inject our new query351 # params, and serialize back to a query string.352 url_parts = urlsplit(request.url)353 # parse_qs makes each value a list, but in our case we know we won't354 # have repeated keys so we know we have single element lists which we355 # can convert back to scalar values.356 query_dict = dict(357 [(k, v[0]) for k, v in parse_qs(url_parts.query).items()])358 # The spec is particular about this. It *has* to be:359 # https://<endpoint>?<operation params>&<auth params>360 # You can't mix the two types of params together, i.e just keep doing361 # new_query_params.update(op_params)362 # new_query_params.update(auth_params)363 # percent_encode_sequence(new_query_params)364 operation_params = ''365 if request.data:366 # We also need to move the body params into the query string.367 # request.data will be populated, for example, with query services368 # which normally form encode the params into the body.369 # This means that request.data is a dict() of the operation params.370 query_dict.update(request.data)371 request.data = ''372 if query_dict:373 operation_params = percent_encode_sequence(query_dict) + '&'374 new_query_string = (operation_params +375 percent_encode_sequence(auth_params))376 # url_parts is a tuple (and therefore immutable) so we need to create377 # a new url_parts with the new query string.378 # <part> - <index>379 # scheme - 0380 # netloc - 1381 # path - 2382 # query - 3 <-- we're replacing this.383 # fragment - 4384 p = url_parts385 new_url_parts = (p[0], p[1], p[2], new_query_string, p[4])386 request.url = urlunsplit(new_url_parts)387 def _inject_signature_to_request(self, request, signature):388 # Rather than calculating an "Authorization" header, for the query389 # param quth, we just append an 'X-Amz-Signature' param to the end390 # of the query string.391 request.url += '&X-Amz-Signature=%s' % signature392class S3SigV4QueryAuth(SigV4QueryAuth):393 """S3 SigV4 auth using query parameters.394 This signer will sign a request using query parameters and signature395 version 4, i.e a "presigned url" signer.396 Based off of:397 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html398 """399 def _normalize_url_path(self, path):400 # For S3, we do not normalize the path.401 return path402 def payload(self, request):403 # From the doc link above:404 # "You don't include a payload hash in the Canonical Request, because405 # when you create a presigned URL, you don't know anything about the406 # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD".407 return "UNSIGNED-PAYLOAD"408class S3SigV4PostAuth(SigV4Auth):409 """410 Presigns a s3 post411 Implementation doc here:412 http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html413 """...

Full Screen

Full Screen

base.py

Source:base.py Github

copy

Full Screen

...22 Only called for urls that lack a scheme (at the very least), being resolved23 against a base URL that matches this specific fetcher.24 """25 return url26 def _normalize_url_path(self, url: furl) -> furl: # pylint: disable=no-self-use27 """Normalize the path component of a URL."""28 if not url.path.segments[-1].endswith(TOML_EXTENSION):29 url = url.copy()30 url.path.segments[-1] = f"{url.path.segments[-1]}{TOML_EXTENSION}"31 return url32 def _normalize_scheme(self, scheme: str) -> str: # pylint: disable=no-self-use33 """Normalize the scheme component of a URL."""34 return scheme35 def normalize(self, url: furl) -> furl:36 """Normalize a URL.37 Produces a canonical URL, meant to be used to uniquely identify a style resource.38 - The base name has .toml appended if not already ending in that extension39 - Individual fetchers can further normalize the path and scheme.40 """41 new_scheme = self._normalize_scheme(url.scheme)42 if new_scheme != url.scheme:43 url = url.copy().set(scheme=new_scheme)44 return self._normalize_url_path(url)45 def fetch(self, url: furl) -> str:46 """Fetch a style from a specific fetcher."""...

Full Screen

Full Screen

file.py

Source:file.py Github

copy

Full Screen

...20 path = Path(url).expanduser()21 # return absolute paths as URLs as on Windows they could otherwise not resolve22 # cleanly against a file:// base. Relative paths should use POSIX conventions.23 return path.as_uri() if path.is_absolute() else path.as_posix()24 def _normalize_url_path(self, url: furl) -> furl:25 local_path = url_to_python_path(super()._normalize_url_path(url))26 return furl(local_path.resolve().as_uri())27 def fetch(self, url: furl) -> str:28 """Fetch a style from a local file."""...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful