How to use get_201_response method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...930 @staticmethod931 def is_multipart_upload(query):932 return query.startswith("uploadId")933 @staticmethod934 def get_201_response(key, bucket_name):935 return """936 <PostResponse>937 <Location>{protocol}://{host}/{encoded_key}</Location>938 <Bucket>{bucket}</Bucket>939 <Key>{key}</Key>940 <ETag>{etag}</ETag>941 </PostResponse>942 """.format(943 protocol=get_service_protocol(),944 host=config.HOSTNAME_EXTERNAL,945 encoded_key=urlparse.quote(key, safe=""),946 key=key,947 bucket=bucket_name,948 etag="d41d8cd98f00b204e9800998ecf8427f",949 )950 @staticmethod951 def _update_location(content, bucket_name):952 bucket_name = normalize_bucket_name(bucket_name)953 host = config.HOSTNAME_EXTERNAL954 if ":" not in host:955 host = "%s:%s" % (host, config.PORT_S3)956 return re.sub(957 r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958 r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959 content,960 flags=re.MULTILINE,961 )962 @staticmethod963 def is_query_allowable(method, query):964 # Generally if there is a query (some/path/with?query) we don't want to send notifications965 if not query:966 return True967 # Except we do want to notify on multipart and presigned url upload completion968 contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969 contains_key = "AWSAccessKeyId" in query and "Signature" in query970 # nodejs sdk putObjectCommand is adding x-id=putobject in the query971 allowed_query = "x-id=" in query.lower()972 if (973 (method == "POST" and query.startswith("uploadId"))974 or contains_cred975 or contains_key976 or allowed_query977 ):978 return True979 @staticmethod980 def parse_policy_expiration_date(expiration_string):981 try:982 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983 except Exception:984 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985 # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986 dt = dt.replace(tzinfo=datetime.timezone.utc)987 return dt988 def forward_request(self, method, path, data, headers):989 # Create list of query parameteres from the url990 parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991 query_params = parse_qs(parsed.query)992 path_orig = path993 path = path.replace(994 "#", "%23"995 ) # support key names containing hashes (e.g., required by Amplify)996 # extracting bucket name from the request997 parsed_path = urlparse.urlparse(path)998 bucket_name = extract_bucket_name(headers, parsed_path.path)999 if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000 if len(parsed_path.path) <= 1:1001 return error_response(1002 "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003 + "configured to use path style addressing, or send a valid "1004 + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005 "InvalidBucketName",1006 status_code=400,1007 )1008 return error_response(1009 "The specified bucket is not valid.",1010 "InvalidBucketName",1011 status_code=400,1012 )1013 # Detecting pre-sign url and checking signature1014 if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015 [p in query_params for p in SIGNATURE_V4_PARAMS]1016 ):1017 response = authenticate_presign_url(1018 method=method, path=path, data=data, headers=headers1019 )1020 if response is not None:1021 return response1022 # handling s3 website hosting requests1023 if is_static_website(headers) and method == "GET":1024 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025 # check content md5 hash integrity if not a copy request or multipart initialization1026 if (1027 "Content-MD5" in headers1028 and not self.is_s3_copy_request(headers, path)1029 and not self.is_create_multipart_request(parsed_path.query)1030 ):1031 response = check_content_md5(data, headers)1032 if response is not None:1033 return response1034 modified_data = None1035 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037 to_find2 = to_bytes("<CreateBucketConfiguration")1038 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039 # Note: with the latest version, <CreateBucketConfiguration> must either1040 # contain a valid <LocationConstraint>, or not be present at all in the body.1041 modified_data = b""1042 # If this request contains streaming v4 authentication signatures, strip them from the message1043 # Related isse: https://github.com/localstack/localstack/issues/981044 # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045 is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046 if is_streaming_payload:1047 modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048 headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049 headers.pop(CONTENT_SHA256_HEADER)1050 # POST requests to S3 may include a "${filename}" placeholder in the1051 # key, which should be replaced with an actual file name before storing.1052 if method == "POST":1053 original_data = not_none_or(modified_data, data)1054 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055 if expanded_data is not original_data:1056 modified_data = expanded_data1057 # If no content-type is provided, 'binary/octet-stream' should be used1058 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059 if method == "PUT" and not headers.get("content-type"):1060 headers["content-type"] = "binary/octet-stream"1061 # parse query params1062 query = parsed_path.query1063 path = parsed_path.path1064 query_map = urlparse.parse_qs(query, keep_blank_values=True)1065 # remap metadata query params (not supported in moto) to request headers1066 append_metadata_headers(method, query_map, headers)1067 # apply fixes1068 headers_changed = fix_metadata_key_underscores(request_headers=headers)1069 if query == "notification" or "notification" in query_map:1070 # handle and return response for ?notification request1071 response = handle_notification_request(bucket_name, method, data)1072 return response1073 # if the Expires key in the url is already expired then return error1074 if method == "GET" and "Expires" in query_map:1075 ts = datetime.datetime.fromtimestamp(1076 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077 )1078 if is_expired(ts):1079 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080 # If multipart POST with policy in the params, return error if the policy has expired1081 if method == "POST":1082 policy_key, policy_value = multipart_content.find_multipart_key_value(1083 data, headers, "policy"1084 )1085 if policy_key and policy_value:1086 policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087 expiration_string = policy.get("expiration", None) # Example: 2020-06-05T13:37:12Z1088 if expiration_string:1089 expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090 if is_expired(expiration_datetime):1091 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092 if query == "cors" or "cors" in query_map:1093 if method == "GET":1094 return get_cors(bucket_name)1095 if method == "PUT":1096 return set_cors(bucket_name, data)1097 if method == "DELETE":1098 return delete_cors(bucket_name)1099 if query == "requestPayment" or "requestPayment" in query_map:1100 if method == "GET":1101 return get_request_payment(bucket_name)1102 if method == "PUT":1103 return set_request_payment(bucket_name, data)1104 if query == "lifecycle" or "lifecycle" in query_map:1105 if method == "GET":1106 return get_lifecycle(bucket_name)1107 if method == "PUT":1108 return set_lifecycle(bucket_name, data)1109 if method == "DELETE":1110 delete_lifecycle(bucket_name)1111 if query == "replication" or "replication" in query_map:1112 if method == "GET":1113 return get_replication(bucket_name)1114 if method == "PUT":1115 return set_replication(bucket_name, data)1116 if method == "DELETE" and validate_bucket_name(bucket_name):1117 delete_lifecycle(bucket_name)1118 path_orig_escaped = path_orig.replace("#", "%23")1119 if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120 data_to_return = not_none_or(modified_data, data)1121 if modified_data is not None:1122 headers["Content-Length"] = str(len(data_to_return or ""))1123 return Request(1124 url=path_orig_escaped,1125 data=data_to_return,1126 headers=headers,1127 method=method,1128 )1129 return True1130 def return_response(self, method, path, data, headers, response, request_handler=None):1131 path = to_str(path)1132 method = to_str(method)1133 path = path.replace("#", "%23")1134 # persist this API call to disk1135 super(ProxyListenerS3, self).return_response(1136 method, path, data, headers, response, request_handler1137 )1138 bucket_name = extract_bucket_name(headers, path)1139 # POST requests to S3 may include a success_action_redirect or1140 # success_action_status field, which should be used to redirect a1141 # client to a new location.1142 key = None1143 if method == "POST":1144 key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145 if key and redirect_url:1146 response.status_code = 3031147 response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148 LOGGER.debug(1149 "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150 )1151 expanded_data = multipart_content.expand_multipart_filename(data, headers)1152 key, status_code = multipart_content.find_multipart_key_value(1153 expanded_data, headers, "success_action_status"1154 )1155 if response.status_code == 201 and key:1156 response._content = self.get_201_response(key, bucket_name)1157 response.headers["Content-Length"] = str(len(response._content or ""))1158 response.headers["Content-Type"] = "application/xml; charset=utf-8"1159 return response1160 if response.status_code == 416:1161 if method == "GET":1162 return error_response(1163 "The requested range cannot be satisfied.", "InvalidRange", 4161164 )1165 elif method == "HEAD":1166 response.status_code = 2001167 return response1168 parsed = urlparse.urlparse(path)1169 bucket_name_in_host = uses_host_addressing(headers)1170 should_send_notifications = all(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful