How to use is_object_specific_request method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...826 return redirect_url827def is_bucket_specified_in_domain_name(path, headers):828 host = headers.get("host", "")829 return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)830def is_object_specific_request(path, headers):831 """Return whether the given request is specific to a certain S3 object.832 Note: the bucket name is usually specified as a path parameter,833 but may also be part of the domain name!"""834 bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)835 parts = len(path.split("/"))836 return parts > (1 if bucket_in_domain else 2)837def empty_response():838 response = Response()839 response.status_code = 200840 response._content = ""841 return response842def handle_notification_request(bucket, method, data):843 if method == "GET":844 return handle_get_bucket_notification(bucket)845 if method == "PUT":846 return handle_put_bucket_notification(bucket, data)847 return empty_response()848def handle_get_bucket_notification(bucket):849 response = Response()850 response.status_code = 200851 response._content = ""852 # TODO check if bucket exists853 result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3854 if bucket in S3_NOTIFICATIONS:855 notifs = S3_NOTIFICATIONS[bucket]856 for notif in notifs:857 for dest in NOTIFICATION_DESTINATION_TYPES:858 if dest in notif:859 dest_dict = {860 "%sConfiguration"861 % dest: {862 "Id": notif["Id"],863 dest: notif[dest],864 "Event": notif["Event"],865 "Filter": notif["Filter"],866 }867 }868 result += xmltodict.unparse(dest_dict, full_document=False)869 result += "</NotificationConfiguration>"870 response._content = result871 return response872def _validate_filter_rules(filter_doc):873 rules = filter_doc.get("FilterRule")874 if not rules:875 return876 for rule in rules:877 name = rule.get("Name", "")878 if name.lower() not in ["suffix", "prefix"]:879 raise InvalidFilterRuleName(name)880 # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882 rules = filter_doc.get("FilterRule")883 if not rules:884 return885 for rule in rules:886 name = rule.get("Name", "")887 if name.lower() not in ["suffix", "prefix"]:888 raise InvalidFilterRuleName(name)889 rule["Name"] = name.title()890def handle_put_bucket_notification(bucket, data):891 parsed = xmltodict.parse(data)892 notif_config = parsed.get("NotificationConfiguration")893 notifications = []894 for dest in NOTIFICATION_DESTINATION_TYPES:895 config = notif_config.get("%sConfiguration" % dest)896 configs = config if isinstance(config, list) else [config] if config else []897 for config in configs:898 events = config.get("Event")899 if isinstance(events, six.string_types):900 events = [events]901 event_filter = config.get("Filter", {})902 # make sure FilterRule is an array903 s3_filter = _get_s3_filter(event_filter)904 if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):905 s3_filter["FilterRule"] = [s3_filter["FilterRule"]]906 # make sure FilterRules are valid and sanitize if necessary907 _sanitize_notification_filter_rules(s3_filter)908 # create final details dict909 notification_details = {910 "Id": config.get("Id", str(uuid.uuid4())),911 "Event": events,912 dest: config.get(dest),913 "Filter": event_filter,914 }915 notifications.append(clone(notification_details))916 S3_NOTIFICATIONS[bucket] = notifications917 return empty_response()918def remove_bucket_notification(bucket):919 if bucket in S3_NOTIFICATIONS:920 del S3_NOTIFICATIONS[bucket]921class ProxyListenerS3(PersistingProxyListener):922 def api_name(self):923 return "s3"924 @staticmethod925 def is_s3_copy_request(headers, path):926 return "x-amz-copy-source" in headers or "x-amz-copy-source" in path927 @staticmethod928 def is_create_multipart_request(query):929 return query.startswith("uploads")930 @staticmethod931 def is_multipart_upload(query):932 return query.startswith("uploadId")933 @staticmethod934 def get_201_response(key, bucket_name):935 return """936 <PostResponse>937 <Location>{protocol}://{host}/{encoded_key}</Location>938 <Bucket>{bucket}</Bucket>939 <Key>{key}</Key>940 <ETag>{etag}</ETag>941 </PostResponse>942 """.format(943 protocol=get_service_protocol(),944 host=config.HOSTNAME_EXTERNAL,945 encoded_key=urlparse.quote(key, safe=""),946 key=key,947 bucket=bucket_name,948 etag="d41d8cd98f00b204e9800998ecf8427f",949 )950 @staticmethod951 def _update_location(content, bucket_name):952 bucket_name = normalize_bucket_name(bucket_name)953 host = config.HOSTNAME_EXTERNAL954 if ":" not in host:955 host = "%s:%s" % (host, config.PORT_S3)956 return re.sub(957 r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958 r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959 content,960 flags=re.MULTILINE,961 )962 @staticmethod963 def is_query_allowable(method, query):964 # Generally if there is a query (some/path/with?query) we don't want to send notifications965 if not query:966 return True967 # Except we do want to notify on multipart and presigned url upload completion968 contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969 contains_key = "AWSAccessKeyId" in query and "Signature" in query970 # nodejs sdk putObjectCommand is adding x-id=putobject in the query971 allowed_query = "x-id=" in query.lower()972 if (973 (method == "POST" and query.startswith("uploadId"))974 or contains_cred975 or contains_key976 or allowed_query977 ):978 return True979 @staticmethod980 def parse_policy_expiration_date(expiration_string):981 try:982 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983 except Exception:984 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985 # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986 dt = dt.replace(tzinfo=datetime.timezone.utc)987 return dt988 def forward_request(self, method, path, data, headers):989 # Create list of query parameteres from the url990 parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991 query_params = parse_qs(parsed.query)992 path_orig = path993 path = path.replace(994 "#", "%23"995 ) # support key names containing hashes (e.g., required by Amplify)996 # extracting bucket name from the request997 parsed_path = urlparse.urlparse(path)998 bucket_name = extract_bucket_name(headers, parsed_path.path)999 if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000 if len(parsed_path.path) <= 1:1001 return error_response(1002 "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003 + "configured to use path style addressing, or send a valid "1004 + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005 "InvalidBucketName",1006 status_code=400,1007 )1008 return error_response(1009 "The specified bucket is not valid.",1010 "InvalidBucketName",1011 status_code=400,1012 )1013 # Detecting pre-sign url and checking signature1014 if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015 [p in query_params for p in SIGNATURE_V4_PARAMS]1016 ):1017 response = authenticate_presign_url(1018 method=method, path=path, data=data, headers=headers1019 )1020 if response is not None:1021 return response1022 # handling s3 website hosting requests1023 if is_static_website(headers) and method == "GET":1024 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025 # check content md5 hash integrity if not a copy request or multipart initialization1026 if (1027 "Content-MD5" in headers1028 and not self.is_s3_copy_request(headers, path)1029 and not self.is_create_multipart_request(parsed_path.query)1030 ):1031 response = check_content_md5(data, headers)1032 if response is not None:1033 return response1034 modified_data = None1035 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037 to_find2 = to_bytes("<CreateBucketConfiguration")1038 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039 # Note: with the latest version, <CreateBucketConfiguration> must either1040 # contain a valid <LocationConstraint>, or not be present at all in the body.1041 modified_data = b""1042 # If this request contains streaming v4 authentication signatures, strip them from the message1043 # Related isse: https://github.com/localstack/localstack/issues/981044 # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045 is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046 if is_streaming_payload:1047 modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048 headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049 headers.pop(CONTENT_SHA256_HEADER)1050 # POST requests to S3 may include a "${filename}" placeholder in the1051 # key, which should be replaced with an actual file name before storing.1052 if method == "POST":1053 original_data = not_none_or(modified_data, data)1054 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055 if expanded_data is not original_data:1056 modified_data = expanded_data1057 # If no content-type is provided, 'binary/octet-stream' should be used1058 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059 if method == "PUT" and not headers.get("content-type"):1060 headers["content-type"] = "binary/octet-stream"1061 # parse query params1062 query = parsed_path.query1063 path = parsed_path.path1064 query_map = urlparse.parse_qs(query, keep_blank_values=True)1065 # remap metadata query params (not supported in moto) to request headers1066 append_metadata_headers(method, query_map, headers)1067 # apply fixes1068 headers_changed = fix_metadata_key_underscores(request_headers=headers)1069 if query == "notification" or "notification" in query_map:1070 # handle and return response for ?notification request1071 response = handle_notification_request(bucket_name, method, data)1072 return response1073 # if the Expires key in the url is already expired then return error1074 if method == "GET" and "Expires" in query_map:1075 ts = datetime.datetime.fromtimestamp(1076 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077 )1078 if is_expired(ts):1079 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080 # If multipart POST with policy in the params, return error if the policy has expired1081 if method == "POST":1082 policy_key, policy_value = multipart_content.find_multipart_key_value(1083 data, headers, "policy"1084 )1085 if policy_key and policy_value:1086 policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087 expiration_string = policy.get("expiration", None) # Example: 2020-06-05T13:37:12Z1088 if expiration_string:1089 expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090 if is_expired(expiration_datetime):1091 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092 if query == "cors" or "cors" in query_map:1093 if method == "GET":1094 return get_cors(bucket_name)1095 if method == "PUT":1096 return set_cors(bucket_name, data)1097 if method == "DELETE":1098 return delete_cors(bucket_name)1099 if query == "requestPayment" or "requestPayment" in query_map:1100 if method == "GET":1101 return get_request_payment(bucket_name)1102 if method == "PUT":1103 return set_request_payment(bucket_name, data)1104 if query == "lifecycle" or "lifecycle" in query_map:1105 if method == "GET":1106 return get_lifecycle(bucket_name)1107 if method == "PUT":1108 return set_lifecycle(bucket_name, data)1109 if method == "DELETE":1110 delete_lifecycle(bucket_name)1111 if query == "replication" or "replication" in query_map:1112 if method == "GET":1113 return get_replication(bucket_name)1114 if method == "PUT":1115 return set_replication(bucket_name, data)1116 if method == "DELETE" and validate_bucket_name(bucket_name):1117 delete_lifecycle(bucket_name)1118 path_orig_escaped = path_orig.replace("#", "%23")1119 if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120 data_to_return = not_none_or(modified_data, data)1121 if modified_data is not None:1122 headers["Content-Length"] = str(len(data_to_return or ""))1123 return Request(1124 url=path_orig_escaped,1125 data=data_to_return,1126 headers=headers,1127 method=method,1128 )1129 return True1130 def return_response(self, method, path, data, headers, response, request_handler=None):1131 path = to_str(path)1132 method = to_str(method)1133 path = path.replace("#", "%23")1134 # persist this API call to disk1135 super(ProxyListenerS3, self).return_response(1136 method, path, data, headers, response, request_handler1137 )1138 bucket_name = extract_bucket_name(headers, path)1139 # POST requests to S3 may include a success_action_redirect or1140 # success_action_status field, which should be used to redirect a1141 # client to a new location.1142 key = None1143 if method == "POST":1144 key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145 if key and redirect_url:1146 response.status_code = 3031147 response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148 LOGGER.debug(1149 "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150 )1151 expanded_data = multipart_content.expand_multipart_filename(data, headers)1152 key, status_code = multipart_content.find_multipart_key_value(1153 expanded_data, headers, "success_action_status"1154 )1155 if response.status_code == 201 and key:1156 response._content = self.get_201_response(key, bucket_name)1157 response.headers["Content-Length"] = str(len(response._content or ""))1158 response.headers["Content-Type"] = "application/xml; charset=utf-8"1159 return response1160 if response.status_code == 416:1161 if method == "GET":1162 return error_response(1163 "The requested range cannot be satisfied.", "InvalidRange", 4161164 )1165 elif method == "HEAD":1166 response.status_code = 2001167 return response1168 parsed = urlparse.urlparse(path)1169 bucket_name_in_host = uses_host_addressing(headers)1170 should_send_notifications = all(1171 [1172 method in ("PUT", "POST", "DELETE"),1173 "/" in path[1:] or bucket_name_in_host or key,1174 # check if this is an actual put object request, because it could also be1175 # a put bucket request with a path like this: /bucket_name/1176 bucket_name_in_host1177 or key1178 or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1179 self.is_query_allowable(method, parsed.query),1180 ]1181 )1182 # get subscribers and send bucket notifications1183 if should_send_notifications:1184 # if we already have a good key, use it, otherwise examine the path1185 if key:1186 object_path = "/" + key1187 elif bucket_name_in_host:1188 object_path = parsed.path1189 else:1190 parts = parsed.path[1:].split("/", 1)1191 object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1192 version_id = response.headers.get("x-amz-version-id", None)1193 send_notifications(method, bucket_name, object_path, version_id, headers)1194 # publish event for creation/deletion of buckets:1195 if method in ("PUT", "DELETE") and (1196 "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01197 ):1198 event_type = (1199 event_publisher.EVENT_S3_CREATE_BUCKET1200 if method == "PUT"1201 else event_publisher.EVENT_S3_DELETE_BUCKET1202 )1203 event_publisher.fire_event(1204 event_type, payload={"n": event_publisher.get_hash(bucket_name)}1205 )1206 # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1207 if method == "PUT":1208 if parsed.query == "policy":1209 response._content = ""1210 response.status_code = 2041211 return response1212 # when creating s3 bucket using aws s3api the return header contains 'Location' param1213 if key is None:1214 # if the bucket is created in 'us-east-1' the location header contains bucket as path1215 # else the the header contains bucket url1216 if aws_stack.get_region() == "us-east-1":1217 response.headers["Location"] = "/{}".format(bucket_name)1218 else:1219 # Note: we need to set the correct protocol here1220 protocol = (1221 headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1222 or "http"1223 )1224 response.headers["Location"] = "{}://{}.{}:{}/".format(1225 protocol,1226 bucket_name,1227 constants.S3_VIRTUAL_HOSTNAME,1228 config.EDGE_PORT,1229 )1230 if response is not None:1231 reset_content_length = False1232 # append CORS headers and other annotations/patches to response1233 append_cors_headers(1234 bucket_name,1235 request_method=method,1236 request_headers=headers,1237 response=response,1238 )1239 append_last_modified_headers(response=response)1240 append_list_objects_marker(method, path, data, response)1241 fix_location_constraint(response)1242 fix_range_content_type(bucket_name, path, headers, response)1243 fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1244 fix_metadata_key_underscores(response=response)1245 fix_creation_date(method, path, response=response)1246 fix_etag_for_multipart(data, headers, response)1247 ret304_on_etag(data, headers, response)1248 append_aws_request_troubleshooting_headers(response)1249 fix_delimiter(data, headers, response)1250 if method == "PUT":1251 set_object_expiry(path, headers)1252 # Remove body from PUT response on presigned URL1253 # https://github.com/localstack/localstack/issues/13171254 if (1255 method == "PUT"1256 and int(response.status_code) < 4001257 and (1258 "X-Amz-Security-Token=" in path1259 or "X-Amz-Credential=" in path1260 or "AWSAccessKeyId=" in path1261 )1262 ):1263 response._content = ""1264 reset_content_length = True1265 response_content_str = None1266 try:1267 response_content_str = to_str(response._content)1268 except Exception:1269 pass1270 # Honor response header overrides1271 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1272 if method == "GET":1273 add_accept_range_header(response)1274 add_response_metadata_headers(response)1275 if is_object_expired(path):1276 return no_such_key_error(path, headers.get("x-amz-request-id"), 400)1277 # AWS C# SDK uses get bucket acl to check the existence of the bucket1278 # If not exists, raises a NoSuchBucket Error1279 if bucket_name and "/?acl" in path:1280 exists, code, body = is_bucket_available(bucket_name)1281 if not exists:1282 return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1283 query_map = urlparse.parse_qs(parsed.query, keep_blank_values=True)1284 for param_name, header_name in ALLOWED_HEADER_OVERRIDES.items():1285 if param_name in query_map:1286 response.headers[header_name] = query_map[param_name][0]1287 if response_content_str and response_content_str.startswith("<"):1288 is_bytes = isinstance(response._content, six.binary_type)1289 response._content = response_content_str1290 append_last_modified_headers(response=response, content=response_content_str)1291 # We need to un-pretty-print the XML, otherwise we run into this issue with Spark:1292 # https://github.com/jserver/mock-s3/pull/9/files1293 # https://github.com/localstack/localstack/issues/1831294 # Note: yet, we need to make sure we have a newline after the first line: <?xml ...>\n1295 # Note: make sure to return XML docs verbatim: https://github.com/localstack/localstack/issues/10371296 if method != "GET" or not is_object_specific_request(path, headers):1297 response._content = re.sub(1298 r"([^\?])>\n\s*<",1299 r"\1><",1300 response_content_str,1301 flags=re.MULTILINE,1302 )1303 # update Location information in response payload1304 response._content = self._update_location(response._content, bucket_name)1305 # convert back to bytes1306 if is_bytes:1307 response._content = to_bytes(response._content)1308 # fix content-type: https://github.com/localstack/localstack/issues/6181309 # https://github.com/localstack/localstack/issues/5491310 # https://github.com/localstack/localstack/issues/854...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful