How to use fix_list_objects_response method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...574 if response.headers.get("Last-Modified", "") == "":575 response.headers["Last-Modified"] = datetime.datetime.now().strftime(time_format)576 except Exception as err:577 LOGGER.error("Caught generic exception (setting LastModified header): %s", err)578def fix_list_objects_response(method, path, data, response):579 content = response.content or b""580 if b"<ListBucketResult" not in to_bytes(content):581 return582 content = to_str(content)583 parsed = urlparse(path)584 query_map = parse_qs(parsed.query)585 # insert <Marker> element into response586 if "<Marker>" not in content:587 marker = ""588 if query_map.get("marker"):589 marker = query_map.get("marker")[0]590 insert = "<Marker>%s</Marker>" % marker591 content = content.replace("</ListBucketResult>", f"{insert}</ListBucketResult>")592 # insert <EncodingType> element into response593 encoding_type = query_map.get("encoding-type")594 if "<EncodingType>" not in content and encoding_type:595 insert = f"<EncodingType>{encoding_type[0]}</EncodingType>"596 content = content.replace("</ListBucketResult>", f"{insert}</ListBucketResult>")597 # fix URL-encoding of <Delimiter> response element598 if "<Delimiter>" in content:599 regex = "<Delimiter>([^<]+)</Delimiter>"600 delimiter = re.search(regex, content).group(1).strip()601 if delimiter != "/":602 content = re.sub(regex, f"<Delimiter>{quote(delimiter)}</Delimiter>", content)603 response._content = content604 response.headers.pop("Content-Length", None)605def append_metadata_headers(method, query_map, headers):606 for key, value in query_map.items():607 if key.lower().startswith(OBJECT_METADATA_KEY_PREFIX):608 if headers.get(key) is None:609 headers[key] = value[0]610def fix_range_content_type(bucket_name, path, headers, response):611 # Fix content type for Range requests - https://github.com/localstack/localstack/issues/1259612 if "Range" not in headers:613 return614 if response.status_code >= 400:615 return616 s3_client = aws_stack.connect_to_service("s3")617 path = urlparse(unquote(path)).path618 key_name = extract_key_name(headers, path)619 result = s3_client.head_object(Bucket=bucket_name, Key=key_name)620 content_type = result["ContentType"]621 if response.headers.get("Content-Type") == "text/html; charset=utf-8":622 response.headers["Content-Type"] = content_type623def fix_delete_objects_response(bucket_name, method, parsed_path, data, headers, response):624 # Deleting non-existing keys should not result in errors.625 # Fixes https://github.com/localstack/localstack/issues/1893626 if not (method == "POST" and parsed_path.query == "delete" and "<Delete" in to_str(data or "")):627 return628 content = to_str(response._content)629 if "<Error>" not in content:630 return631 result = xmltodict.parse(content).get("DeleteResult")632 # can be NoSuchBucket error633 if not result:634 return635 errors = result.get("Error")636 errors = errors if isinstance(errors, list) else [errors]637 deleted = result.get("Deleted")638 if not isinstance(result.get("Deleted"), list):639 deleted = result["Deleted"] = [deleted] if deleted else []640 for entry in list(errors):641 if set(entry.keys()) == set(["Key"]):642 errors.remove(entry)643 deleted.append(entry)644 if not errors:645 result.pop("Error")646 response._content = xmltodict.unparse({"DeleteResult": result})647def fix_metadata_key_underscores(request_headers=None, response=None):648 if request_headers is None:649 request_headers = {}650 # fix for https://github.com/localstack/localstack/issues/1790651 underscore_replacement = "---"652 meta_header_prefix = "x-amz-meta-"653 prefix_len = len(meta_header_prefix)654 updated = False655 for key in list(request_headers.keys()):656 if key.lower().startswith(meta_header_prefix):657 key_new = meta_header_prefix + key[prefix_len:].replace("_", underscore_replacement)658 if key != key_new:659 request_headers[key_new] = request_headers.pop(key)660 updated = True661 if response is not None:662 for key in list(response.headers.keys()):663 if key.lower().startswith(meta_header_prefix):664 key_new = meta_header_prefix + key[prefix_len:].replace(underscore_replacement, "_")665 if key != key_new:666 response.headers[key_new] = response.headers.pop(key)667 return updated668def fix_creation_date(method, path, response):669 if method != "GET" or path != "/":670 return671 response._content = re.sub(672 r"(\.[0-9]+)(\+00:00)?</CreationDate>",673 r"\1Z</CreationDate>",674 to_str(response._content),675 )676def replace_in_xml_response(response, search: str, replace: str):677 if response.status_code != 200 or not response._content:678 return679 c, xml_prefix = response._content, "<?xml"680 if isinstance(c, bytes):681 xml_prefix, search, replace = xml_prefix.encode(), search.encode(), replace.encode()682 if c.startswith(xml_prefix):683 response._content = re.compile(search).sub(replace, c)684def fix_delimiter(response):685 replace_in_xml_response(response, "<Delimiter>None<", "<Delimiter><")686def fix_xml_preamble_newline(method, path, headers, response):687 # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint688 # this is required because upstream moto is generally collapsing all S3 XML responses:689 # https://github.com/spulec/moto/blob/3718cde444b3e0117072c29b087237e1787c3a66/moto/core/responses.py#L102-L104690 if is_object_download_request(method, path, headers):691 return692 replace_in_xml_response(response, r"(<\?xml [^>]+>)<", r"\1\n<")693def convert_to_chunked_encoding(method, path, response):694 if method != "GET" or path != "/":695 return696 if response.headers.get("Transfer-Encoding", "").lower() == "chunked":697 return698 response.headers["Transfer-Encoding"] = "chunked"699 response.headers.pop("Content-Encoding", None)700 response.headers.pop("Content-Length", None)701def strip_surrounding_quotes(s):702 if (s[0], s[-1]) in (('"', '"'), ("'", "'")):703 return s[1:-1]704 return s705def ret304_on_etag(data, headers, response):706 etag = response.headers.get("ETag")707 if etag:708 match = headers.get("If-None-Match")709 if match and strip_surrounding_quotes(match) == strip_surrounding_quotes(etag):710 response.status_code = 304711 response._content = ""712def remove_xml_preamble(response):713 """Removes <?xml ... ?> from a response content"""714 response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))715# --------------716# HELPER METHODS717# for lifecycle/replication/...718# --------------719def get_lifecycle(bucket_name):720 bucket_name = normalize_bucket_name(bucket_name)721 exists, code, body = is_bucket_available(bucket_name)722 if not exists:723 return xml_response(body, status_code=code)724 lifecycle = BackendState.lifecycle_config(bucket_name)725 status_code = 200726 if not lifecycle:727 lifecycle = {728 "Error": {729 "Code": "NoSuchLifecycleConfiguration",730 "Message": "The lifecycle configuration does not exist",731 "BucketName": bucket_name,732 }733 }734 status_code = 404735 body = xmltodict.unparse(lifecycle)736 return xml_response(body, status_code=status_code)737def get_replication(bucket_name):738 bucket_name = normalize_bucket_name(bucket_name)739 exists, code, body = is_bucket_available(bucket_name)740 if not exists:741 return xml_response(body, status_code=code)742 replication = BackendState.replication_config(bucket_name)743 status_code = 200744 if not replication:745 replication = {746 "Error": {747 "Code": "ReplicationConfigurationNotFoundError",748 "Message": "The replication configuration was not found",749 "BucketName": bucket_name,750 }751 }752 status_code = 404753 body = xmltodict.unparse(replication)754 return xml_response(body, status_code=status_code)755def set_lifecycle(bucket_name, lifecycle):756 bucket_name = normalize_bucket_name(bucket_name)757 exists, code, body = is_bucket_available(bucket_name)758 if not exists:759 return xml_response(body, status_code=code)760 if isinstance(to_str(lifecycle), str):761 lifecycle = xmltodict.parse(lifecycle)762 bucket_lifecycle = BackendState.lifecycle_config(bucket_name)763 bucket_lifecycle.clear()764 bucket_lifecycle.update(lifecycle)765 return 200766def delete_lifecycle(bucket_name):767 bucket_name = normalize_bucket_name(bucket_name)768 exists, code, body = is_bucket_available(bucket_name)769 if not exists:770 return xml_response(body, status_code=code)771 BackendState.lifecycle_config(bucket_name).clear()772def set_replication(bucket_name, replication):773 bucket_name = normalize_bucket_name(bucket_name)774 exists, code, body = is_bucket_available(bucket_name)775 if not exists:776 return xml_response(body, status_code=code)777 if isinstance(to_str(replication), str):778 replication = xmltodict.parse(replication)779 bucket_replication = BackendState.replication_config(bucket_name)780 bucket_replication.clear()781 bucket_replication.update(replication)782 return 200783# -------------784# UTIL METHODS785# -------------786def is_bucket_available(bucket_name):787 body = {"Code": "200"}788 exists, code = bucket_exists(bucket_name)789 if not exists:790 body = {791 "Error": {792 "Code": code,793 "Message": "The bucket does not exist",794 "BucketName": bucket_name,795 }796 }797 return exists, code, body798 return True, 200, body799def bucket_exists(bucket_name):800 """Tests for the existence of the specified bucket. Returns the error code801 if the bucket does not exist (200 if the bucket does exist).802 """803 bucket_name = normalize_bucket_name(bucket_name)804 s3_client = aws_stack.connect_to_service("s3")805 try:806 s3_client.head_bucket(Bucket=bucket_name)807 except ClientError as err:808 error_code = err.response.get("Error").get("Code")809 return False, error_code810 return True, 200811def strip_chunk_signatures(body, content_length):812 # borrowed from https://github.com/spulec/moto/pull/4201813 body_io = io.BytesIO(body)814 new_body = bytearray(content_length)815 pos = 0816 line = body_io.readline()817 while line:818 # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html#sigv4-chunked-body-definition819 # str(hex(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n820 chunk_size = int(line[: line.find(b";")].decode("utf8"), 16)821 new_body[pos : pos + chunk_size] = body_io.read(chunk_size)822 pos = pos + chunk_size823 body_io.read(2) # skip trailing \r\n824 line = body_io.readline()825 return bytes(new_body)826def check_content_md5(data, headers):827 if headers.get("x-amz-content-sha256", None) == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD":828 content_length = headers.get("x-amz-decoded-content-length")829 if not content_length:830 return error_response(831 '"X-Amz-Decoded-Content-Length" header is missing',832 "SignatureDoesNotMatch",833 status_code=403,834 )835 try:836 content_length = int(content_length)837 except ValueError:838 return error_response(839 'Wrong "X-Amz-Decoded-Content-Length" header',840 "SignatureDoesNotMatch",841 status_code=403,842 )843 data = strip_chunk_signatures(data, content_length)844 actual = md5(data)845 try:846 md5_header = headers["Content-MD5"]847 if not is_base64(md5_header):848 raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)849 expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))850 except Exception:851 return error_response(852 "The Content-MD5 you specified is not valid.",853 "InvalidDigest",854 status_code=400,855 )856 if actual != expected:857 return error_response(858 "The Content-MD5 you specified did not match what we received.",859 "BadDigest",860 status_code=400,861 )862def error_response(message, code, status_code=400):863 result = {"Error": {"Code": code, "Message": message}}864 content = xmltodict.unparse(result)865 return xml_response(content, status_code=status_code)866def xml_response(content, status_code=200):867 headers = {"Content-Type": "application/xml"}868 return requests_response(content, status_code=status_code, headers=headers)869def no_such_key_error(resource, requestId=None, status_code=400):870 result = {871 "Error": {872 "Code": "NoSuchKey",873 "Message": "The resource you requested does not exist",874 "Resource": resource,875 "RequestId": requestId,876 }877 }878 content = xmltodict.unparse(result)879 return xml_response(content, status_code=status_code)880def no_such_bucket(bucket_name, requestId=None, status_code=404):881 # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists882 result = {883 "Error": {884 "Code": "NoSuchBucket",885 "Message": "The specified bucket does not exist",886 "BucketName": bucket_name,887 "RequestId": requestId,888 "HostId": short_uid(),889 }890 }891 content = xmltodict.unparse(result)892 return xml_response(content, status_code=status_code)893def token_expired_error(resource, requestId=None, status_code=400):894 result = {895 "Error": {896 "Code": "ExpiredToken",897 "Message": "The provided token has expired.",898 "Resource": resource,899 "RequestId": requestId,900 }901 }902 content = xmltodict.unparse(result)903 return xml_response(content, status_code=status_code)904def expand_redirect_url(starting_url, key, bucket):905 """Add key and bucket parameters to starting URL query string."""906 parsed = urlparse(starting_url)907 query = collections.OrderedDict(parse_qsl(parsed.query))908 query.update([("key", key), ("bucket", bucket)])909 redirect_url = urlunparse(910 (911 parsed.scheme,912 parsed.netloc,913 parsed.path,914 parsed.params,915 urlencode(query),916 None,917 )918 )919 return redirect_url920def is_bucket_specified_in_domain_name(path, headers):921 host = headers.get("host", "")922 return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)923def is_object_specific_request(path, headers):924 """Return whether the given request is specific to a certain S3 object.925 Note: the bucket name is usually specified as a path parameter,926 but may also be part of the domain name!"""927 bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)928 parts = len(path.split("/"))929 return parts > (1 if bucket_in_domain else 2)930def empty_response():931 response = Response()932 response.status_code = 200933 response._content = ""934 return response935def handle_notification_request(bucket, method, data):936 if method == "GET":937 return handle_get_bucket_notification(bucket)938 if method == "PUT":939 return handle_put_bucket_notification(bucket, data)940 return empty_response()941def handle_get_bucket_notification(bucket):942 response = Response()943 response.status_code = 200944 response._content = ""945 result = f'<NotificationConfiguration xmlns="{XMLNS_S3}">'946 notifications = BackendState.notification_configs(bucket) or []947 for notif in notifications:948 for dest in NOTIFICATION_DESTINATION_TYPES:949 if dest in notif:950 dest_dict = {951 f"{dest}Configuration": {952 "Id": notif["Id"],953 dest: notif[dest],954 "Event": notif["Event"],955 "Filter": notif["Filter"],956 }957 }958 result += xmltodict.unparse(dest_dict, full_document=False)959 result += "</NotificationConfiguration>"960 response._content = result961 return response962def _validate_filter_rules(filter_doc):963 rules = filter_doc.get("FilterRule")964 if not rules:965 return966 for rule in rules:967 name = rule.get("Name", "")968 if name.lower() not in ["suffix", "prefix"]:969 raise InvalidFilterRuleName(name)970 # TODO: check what other rules there are971def _sanitize_notification_filter_rules(filter_doc):972 rules = filter_doc.get("FilterRule")973 if not rules:974 return975 for rule in rules:976 name = rule.get("Name", "")977 if name.lower() not in ["suffix", "prefix"]:978 raise InvalidFilterRuleName(name)979 rule["Name"] = name.title()980def handle_put_bucket_notification(bucket, data):981 parsed = strip_xmlns(xmltodict.parse(data))982 notif_config = parsed.get("NotificationConfiguration")983 notifications = BackendState.notification_configs(bucket)984 notifications.clear()985 for dest in NOTIFICATION_DESTINATION_TYPES:986 config = notif_config.get("%sConfiguration" % dest)987 configs = config if isinstance(config, list) else [config] if config else []988 for config in configs:989 events = config.get("Event")990 if isinstance(events, str):991 events = [events]992 event_filter = config.get("Filter", {})993 # make sure FilterRule is an array994 s3_filter = _get_s3_filter(event_filter)995 if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):996 s3_filter["FilterRule"] = [s3_filter["FilterRule"]]997 # make sure FilterRules are valid and sanitize if necessary998 _sanitize_notification_filter_rules(s3_filter)999 # create final details dict1000 notification_details = {1001 "Id": config.get("Id", str(uuid.uuid4())),1002 "Event": events,1003 dest: config.get(dest),1004 "Filter": event_filter,1005 }1006 notifications.append(clone(notification_details))1007 return empty_response()1008def remove_bucket_notification(bucket):1009 notification_configs = BackendState.notification_configs(bucket)1010 if notification_configs:1011 notification_configs.clear()1012class ProxyListenerS3(ProxyListener):1013 def api_name(self):1014 return "s3"1015 @staticmethod1016 def is_s3_copy_request(headers, path):1017 return "x-amz-copy-source" in headers or "x-amz-copy-source" in path1018 @staticmethod1019 def is_create_multipart_request(query):1020 return query.startswith("uploads")1021 @staticmethod1022 def is_multipart_upload(query):1023 return query.startswith("uploadId")1024 @staticmethod1025 def get_201_response(key, bucket_name):1026 return """1027 <PostResponse>1028 <Location>{protocol}://{host}/{encoded_key}</Location>1029 <Bucket>{bucket}</Bucket>1030 <Key>{key}</Key>1031 <ETag>{etag}</ETag>1032 </PostResponse>1033 """.format(1034 protocol=get_service_protocol(),1035 host=config.HOSTNAME_EXTERNAL,1036 encoded_key=quote(key, safe=""),1037 key=key,1038 bucket=bucket_name,1039 etag="d41d8cd98f00b204e9800998ecf8427f",1040 )1041 @staticmethod1042 def _update_location(content, bucket_name):1043 bucket_name = normalize_bucket_name(bucket_name)1044 host = config.HOSTNAME_EXTERNAL1045 if ":" not in host:1046 host = f"{host}:{config.service_port('s3')}"1047 return re.sub(1048 r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",1049 r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),1050 content,1051 flags=re.MULTILINE,1052 )1053 @staticmethod1054 def is_query_allowable(method, query):1055 # Generally if there is a query (some/path/with?query) we don't want to send notifications1056 if not query:1057 return True1058 # Except we do want to notify on multipart and presigned url upload completion1059 contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query1060 contains_key = "AWSAccessKeyId" in query and "Signature" in query1061 # nodejs sdk putObjectCommand is adding x-id=putobject in the query1062 allowed_query = "x-id=" in query.lower()1063 if (1064 (method == "POST" and query.startswith("uploadId"))1065 or contains_cred1066 or contains_key1067 or allowed_query1068 ):1069 return True1070 @staticmethod1071 def parse_policy_expiration_date(expiration_string):1072 try:1073 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)1074 except Exception:1075 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)1076 # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object1077 dt = dt.replace(tzinfo=datetime.timezone.utc)1078 return dt1079 def forward_request(self, method, path, data, headers):1080 # Create list of query parameteres from the url1081 parsed = urlparse("{}{}".format(config.get_edge_url(), path))1082 query_params = parse_qs(parsed.query)1083 path_orig = path1084 path = path.replace(1085 "#", "%23"1086 ) # support key names containing hashes (e.g., required by Amplify)1087 # extracting bucket name from the request1088 parsed_path = urlparse(path)1089 bucket_name = extract_bucket_name(headers, parsed_path.path)1090 if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1091 if len(parsed_path.path) <= 1:1092 return error_response(1093 "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1094 + "configured to use path style addressing, or send a valid "1095 + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1096 "InvalidBucketName",1097 status_code=400,1098 )1099 return error_response(1100 "The specified bucket is not valid.",1101 "InvalidBucketName",1102 status_code=400,1103 )1104 # Detecting pre-sign url and checking signature1105 if any(p in query_params for p in SIGNATURE_V2_PARAMS) or any(1106 p in query_params for p in SIGNATURE_V4_PARAMS1107 ):1108 response = authenticate_presign_url(1109 method=method, path=path, data=data, headers=headers1110 )1111 if response is not None:1112 return response1113 # handling s3 website hosting requests1114 if is_static_website(headers) and method == "GET":1115 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1116 # check content md5 hash integrity if not a copy request or multipart initialization1117 if (1118 "Content-MD5" in headers1119 and not self.is_s3_copy_request(headers, path)1120 and not self.is_create_multipart_request(parsed_path.query)1121 ):1122 response = check_content_md5(data, headers)1123 if response is not None:1124 return response1125 modified_data = None1126 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11127 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1128 to_find2 = to_bytes("<CreateBucketConfiguration")1129 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1130 # Note: with the latest version, <CreateBucketConfiguration> must either1131 # contain a valid <LocationConstraint>, or not be present at all in the body.1132 modified_data = b""1133 # POST requests to S3 may include a "${filename}" placeholder in the1134 # key, which should be replaced with an actual file name before storing.1135 if method == "POST":1136 original_data = not_none_or(modified_data, data)1137 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1138 if expanded_data is not original_data:1139 modified_data = expanded_data1140 # If no content-type is provided, 'binary/octet-stream' should be used1141 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1142 if method == "PUT" and not headers.get("content-type"):1143 headers["content-type"] = "binary/octet-stream"1144 # parse query params1145 query = parsed_path.query1146 path = parsed_path.path1147 query_map = parse_qs(query, keep_blank_values=True)1148 # remap metadata query params (not supported in moto) to request headers1149 append_metadata_headers(method, query_map, headers)1150 # apply fixes1151 headers_changed = fix_metadata_key_underscores(request_headers=headers)1152 if query == "notification" or "notification" in query_map:1153 # handle and return response for ?notification request1154 response = handle_notification_request(bucket_name, method, data)1155 return response1156 # if the Expires key in the url is already expired then return error1157 if method == "GET" and "Expires" in query_map:1158 ts = datetime.datetime.fromtimestamp(1159 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1160 )1161 if is_expired(ts):1162 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1163 # If multipart POST with policy in the params, return error if the policy has expired1164 if method == "POST":1165 policy_key, policy_value = multipart_content.find_multipart_key_value(1166 data, headers, "policy"1167 )1168 if policy_key and policy_value:1169 policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1170 expiration_string = policy.get("expiration", None) # Example: 2020-06-05T13:37:12Z1171 if expiration_string:1172 expiration_datetime = self.parse_policy_expiration_date(expiration_string)1173 if is_expired(expiration_datetime):1174 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1175 if query == "cors" or "cors" in query_map:1176 if method == "GET":1177 return get_cors(bucket_name)1178 if method == "PUT":1179 return set_cors(bucket_name, data)1180 if method == "DELETE":1181 return delete_cors(bucket_name)1182 if query == "requestPayment" or "requestPayment" in query_map:1183 if method == "GET":1184 return get_request_payment(bucket_name)1185 if method == "PUT":1186 return set_request_payment(bucket_name, data)1187 if query == "lifecycle" or "lifecycle" in query_map:1188 if method == "GET":1189 return get_lifecycle(bucket_name)1190 if method == "PUT":1191 return set_lifecycle(bucket_name, data)1192 if method == "DELETE":1193 delete_lifecycle(bucket_name)1194 if query == "replication" or "replication" in query_map:1195 if method == "GET":1196 return get_replication(bucket_name)1197 if method == "PUT":1198 return set_replication(bucket_name, data)1199 if method == "DELETE" and validate_bucket_name(bucket_name):1200 delete_lifecycle(bucket_name)1201 path_orig_escaped = path_orig.replace("#", "%23")1202 if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1203 data_to_return = not_none_or(modified_data, data)1204 if modified_data is not None:1205 headers["Content-Length"] = str(len(data_to_return or ""))1206 return Request(1207 url=path_orig_escaped,1208 data=data_to_return,1209 headers=headers,1210 method=method,1211 )1212 return True1213 def return_response(self, method, path, data, headers, response):1214 path = to_str(path)1215 method = to_str(method)1216 path = path.replace("#", "%23")1217 # persist this API call to disk1218 super(ProxyListenerS3, self).return_response(method, path, data, headers, response)1219 bucket_name = extract_bucket_name(headers, path)1220 # POST requests to S3 may include a success_action_redirect or1221 # success_action_status field, which should be used to redirect a1222 # client to a new location.1223 key = None1224 if method == "POST":1225 key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1226 if key and redirect_url:1227 response.status_code = 3031228 response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1229 LOGGER.debug(1230 "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1231 )1232 expanded_data = multipart_content.expand_multipart_filename(data, headers)1233 key, status_code = multipart_content.find_multipart_key_value(1234 expanded_data, headers, "success_action_status"1235 )1236 if response.status_code == 201 and key:1237 response._content = self.get_201_response(key, bucket_name)1238 response.headers["Content-Length"] = str(len(response._content or ""))1239 response.headers["Content-Type"] = "application/xml; charset=utf-8"1240 return response1241 if response.status_code == 416:1242 if method == "GET":1243 return error_response(1244 "The requested range cannot be satisfied.", "InvalidRange", 4161245 )1246 elif method == "HEAD":1247 response.status_code = 2001248 return response1249 parsed = urlparse(path)1250 bucket_name_in_host = uses_host_addressing(headers)1251 is_object_request = all(1252 [1253 "/" in path[1:] or bucket_name_in_host or key,1254 # check if this is an actual put object request, because it could also be1255 # a put bucket request with a path like this: /bucket_name/1256 bucket_name_in_host1257 or key1258 or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1259 ]1260 )1261 should_send_object_notification = all(1262 [1263 method in ("PUT", "POST", "DELETE"),1264 is_object_request,1265 self.is_query_allowable(method, parsed.query),1266 ]1267 )1268 should_send_tagging_notification = all(1269 ["tagging" in parsed.query, method in ("PUT", "DELETE"), is_object_request]1270 )1271 # get subscribers and send bucket notifications1272 if should_send_object_notification or should_send_tagging_notification:1273 # if we already have a good key, use it, otherwise examine the path1274 if key:1275 object_path = "/" + key1276 elif bucket_name_in_host:1277 object_path = parsed.path1278 else:1279 parts = parsed.path[1:].split("/", 1)1280 object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1281 version_id = response.headers.get("x-amz-version-id", None)1282 if should_send_object_notification:1283 method_map = {1284 "PUT": "ObjectCreated",1285 "POST": "ObjectCreated",1286 "DELETE": "ObjectRemoved",1287 }1288 if should_send_tagging_notification:1289 method_map = {1290 "PUT": "ObjectTagging",1291 "DELETE": "ObjectTagging",1292 }1293 send_notifications(method, bucket_name, object_path, version_id, headers, method_map)1294 # publish event for creation/deletion of buckets:1295 if method in ("PUT", "DELETE") and (1296 "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01297 ):1298 event_type = (1299 event_publisher.EVENT_S3_CREATE_BUCKET1300 if method == "PUT"1301 else event_publisher.EVENT_S3_DELETE_BUCKET1302 )1303 event_publisher.fire_event(1304 event_type, payload={"n": event_publisher.get_hash(bucket_name)}1305 )1306 # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1307 if method == "PUT":1308 if parsed.query == "policy":1309 response._content = ""1310 response.status_code = 2041311 return response1312 # when creating s3 bucket using aws s3api the return header contains 'Location' param1313 if key is None:1314 # if the bucket is created in 'us-east-1' the location header contains bucket as path1315 # else the the header contains bucket url1316 if aws_stack.get_region() == "us-east-1":1317 response.headers["Location"] = "/{}".format(bucket_name)1318 else:1319 # Note: we need to set the correct protocol here1320 protocol = (1321 headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1322 or "http"1323 )1324 response.headers["Location"] = "{}://{}.{}:{}/".format(1325 protocol,1326 bucket_name,1327 constants.S3_VIRTUAL_HOSTNAME,1328 config.EDGE_PORT,1329 )1330 if response is not None:1331 reset_content_length = False1332 # append CORS headers and other annotations/patches to response1333 append_cors_headers(1334 bucket_name,1335 request_method=method,1336 request_headers=headers,1337 response=response,1338 )1339 append_last_modified_headers(response=response)1340 fix_list_objects_response(method, path, data, response)1341 fix_range_content_type(bucket_name, path, headers, response)1342 fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1343 fix_metadata_key_underscores(response=response)1344 fix_creation_date(method, path, response=response)1345 ret304_on_etag(data, headers, response)1346 append_aws_request_troubleshooting_headers(response)1347 fix_delimiter(response)1348 fix_xml_preamble_newline(method, path, headers, response)1349 if method == "PUT":1350 key_name = extract_key_name(headers, path)1351 if key_name:1352 set_object_expiry(bucket_name, key_name, headers)1353 # Remove body from PUT response on presigned URL1354 # https://github.com/localstack/localstack/issues/1317...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful