Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...609    if c.startswith(xml_prefix):610        response._content = re.compile(search).sub(replace, c)611def fix_delimiter(response):612    replace_in_xml_response(response, "<Delimiter>None<", "<Delimiter><")613def fix_xml_preamble_newline(method, path, headers, response):614    # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint615    # this is required because upstream moto is generally collapsing all S3 XML responses:616    # https://github.com/spulec/moto/blob/3718cde444b3e0117072c29b087237e1787c3a66/moto/core/responses.py#L102-L104617    if is_object_download_request(method, path, headers):618        return619    replace_in_xml_response(response, r"(<\?xml [^>]+>)<", r"\1\n<")620def convert_to_chunked_encoding(method, path, response):621    if method != "GET" or path != "/":622        return623    if response.headers.get("Transfer-Encoding", "").lower() == "chunked":624        return625    response.headers["Transfer-Encoding"] = "chunked"626    response.headers.pop("Content-Encoding", None)627    response.headers.pop("Content-Length", None)628def strip_surrounding_quotes(s):629    if (s[0], s[-1]) in (('"', '"'), ("'", "'")):630        return s[1:-1]631    return s632def ret304_on_etag(data, headers, response):633    etag = response.headers.get("ETag")634    if etag:635        match = headers.get("If-None-Match")636        if match and strip_surrounding_quotes(match) == strip_surrounding_quotes(etag):637            response.status_code = 304638            response._content = ""639def remove_xml_preamble(response):640    """Removes <?xml ... ?> from a response content"""641    response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))642# --------------643# HELPER METHODS644#   for lifecycle/replication/...645# --------------646def get_lifecycle(bucket_name):647    bucket_name = normalize_bucket_name(bucket_name)648    exists, code, body = is_bucket_available(bucket_name)649    if not exists:650        return xml_response(body, status_code=code)651    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)652    status_code = 200653    if not lifecycle:654        lifecycle = {655            "Error": {656                "Code": "NoSuchLifecycleConfiguration",657                "Message": "The lifecycle configuration does not exist",658                "BucketName": bucket_name,659            }660        }661        status_code = 404662    body = xmltodict.unparse(lifecycle)663    return xml_response(body, status_code=status_code)664def get_replication(bucket_name):665    bucket_name = normalize_bucket_name(bucket_name)666    exists, code, body = is_bucket_available(bucket_name)667    if not exists:668        return xml_response(body, status_code=code)669    replication = BUCKET_REPLICATIONS.get(bucket_name)670    status_code = 200671    if not replication:672        replication = {673            "Error": {674                "Code": "ReplicationConfigurationNotFoundError",675                "Message": "The replication configuration was not found",676                "BucketName": bucket_name,677            }678        }679        status_code = 404680    body = xmltodict.unparse(replication)681    return xml_response(body, status_code=status_code)682def set_lifecycle(bucket_name, lifecycle):683    bucket_name = normalize_bucket_name(bucket_name)684    exists, code, body = is_bucket_available(bucket_name)685    if not exists:686        return xml_response(body, status_code=code)687    if isinstance(to_str(lifecycle), str):688        lifecycle = xmltodict.parse(lifecycle)689    BUCKET_LIFECYCLE[bucket_name] = lifecycle690    return 200691def delete_lifecycle(bucket_name):692    bucket_name = normalize_bucket_name(bucket_name)693    exists, code, body = is_bucket_available(bucket_name)694    if not exists:695        return xml_response(body, status_code=code)696    if BUCKET_LIFECYCLE.get(bucket_name):697        BUCKET_LIFECYCLE.pop(bucket_name)698def set_replication(bucket_name, replication):699    bucket_name = normalize_bucket_name(bucket_name)700    exists, code, body = is_bucket_available(bucket_name)701    if not exists:702        return xml_response(body, status_code=code)703    if isinstance(to_str(replication), str):704        replication = xmltodict.parse(replication)705    BUCKET_REPLICATIONS[bucket_name] = replication706    return 200707# -------------708# UTIL METHODS709# -------------710def is_bucket_available(bucket_name):711    body = {"Code": "200"}712    exists, code = bucket_exists(bucket_name)713    if not exists:714        body = {715            "Error": {716                "Code": code,717                "Message": "The bucket does not exist",718                "BucketName": bucket_name,719            }720        }721        return exists, code, body722    return True, 200, body723def bucket_exists(bucket_name):724    """Tests for the existence of the specified bucket. Returns the error code725    if the bucket does not exist (200 if the bucket does exist).726    """727    bucket_name = normalize_bucket_name(bucket_name)728    s3_client = aws_stack.connect_to_service("s3")729    try:730        s3_client.head_bucket(Bucket=bucket_name)731    except ClientError as err:732        error_code = err.response.get("Error").get("Code")733        return False, error_code734    return True, 200735def strip_chunk_signatures(body, content_length):736    # borrowed from https://github.com/spulec/moto/pull/4201737    body_io = io.BytesIO(body)738    new_body = bytearray(content_length)739    pos = 0740    line = body_io.readline()741    while line:742        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html#sigv4-chunked-body-definition743        # str(hex(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n744        chunk_size = int(line[: line.find(b";")].decode("utf8"), 16)745        new_body[pos : pos + chunk_size] = body_io.read(chunk_size)746        pos = pos + chunk_size747        body_io.read(2)  # skip trailing \r\n748        line = body_io.readline()749    return bytes(new_body)750def check_content_md5(data, headers):751    if headers.get("x-amz-content-sha256", None) == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD":752        content_length = headers.get("x-amz-decoded-content-length")753        if not content_length:754            return error_response(755                '"X-Amz-Decoded-Content-Length" header is missing',756                "SignatureDoesNotMatch",757                status_code=403,758            )759        try:760            content_length = int(content_length)761        except ValueError:762            return error_response(763                'Wrong "X-Amz-Decoded-Content-Length" header',764                "SignatureDoesNotMatch",765                status_code=403,766            )767        data = strip_chunk_signatures(data, content_length)768    actual = md5(data)769    try:770        md5_header = headers["Content-MD5"]771        if not is_base64(md5_header):772            raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)773        expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))774    except Exception:775        return error_response(776            "The Content-MD5 you specified is not valid.",777            "InvalidDigest",778            status_code=400,779        )780    if actual != expected:781        return error_response(782            "The Content-MD5 you specified did not match what we received.",783            "BadDigest",784            status_code=400,785        )786def error_response(message, code, status_code=400):787    result = {"Error": {"Code": code, "Message": message}}788    content = xmltodict.unparse(result)789    return xml_response(content, status_code=status_code)790def xml_response(content, status_code=200):791    headers = {"Content-Type": "application/xml"}792    return requests_response(content, status_code=status_code, headers=headers)793def no_such_key_error(resource, requestId=None, status_code=400):794    result = {795        "Error": {796            "Code": "NoSuchKey",797            "Message": "The resource you requested does not exist",798            "Resource": resource,799            "RequestId": requestId,800        }801    }802    content = xmltodict.unparse(result)803    return xml_response(content, status_code=status_code)804def no_such_bucket(bucket_name, requestId=None, status_code=404):805    # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists806    result = {807        "Error": {808            "Code": "NoSuchBucket",809            "Message": "The specified bucket does not exist",810            "BucketName": bucket_name,811            "RequestId": requestId,812            "HostId": short_uid(),813        }814    }815    content = xmltodict.unparse(result)816    return xml_response(content, status_code=status_code)817def token_expired_error(resource, requestId=None, status_code=400):818    result = {819        "Error": {820            "Code": "ExpiredToken",821            "Message": "The provided token has expired.",822            "Resource": resource,823            "RequestId": requestId,824        }825    }826    content = xmltodict.unparse(result)827    return xml_response(content, status_code=status_code)828def expand_redirect_url(starting_url, key, bucket):829    """Add key and bucket parameters to starting URL query string."""830    parsed = urlparse(starting_url)831    query = collections.OrderedDict(parse_qsl(parsed.query))832    query.update([("key", key), ("bucket", bucket)])833    redirect_url = urlunparse(834        (835            parsed.scheme,836            parsed.netloc,837            parsed.path,838            parsed.params,839            urlencode(query),840            None,841        )842    )843    return redirect_url844def is_bucket_specified_in_domain_name(path, headers):845    host = headers.get("host", "")846    return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)847def is_object_specific_request(path, headers):848    """Return whether the given request is specific to a certain S3 object.849    Note: the bucket name is usually specified as a path parameter,850    but may also be part of the domain name!"""851    bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)852    parts = len(path.split("/"))853    return parts > (1 if bucket_in_domain else 2)854def empty_response():855    response = Response()856    response.status_code = 200857    response._content = ""858    return response859def handle_notification_request(bucket, method, data):860    if method == "GET":861        return handle_get_bucket_notification(bucket)862    if method == "PUT":863        return handle_put_bucket_notification(bucket, data)864    return empty_response()865def handle_get_bucket_notification(bucket):866    response = Response()867    response.status_code = 200868    response._content = ""869    # TODO check if bucket exists870    result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3871    if bucket in S3_NOTIFICATIONS:872        notifs = S3_NOTIFICATIONS[bucket]873        for notif in notifs:874            for dest in NOTIFICATION_DESTINATION_TYPES:875                if dest in notif:876                    dest_dict = {877                        "%sConfiguration"878                        % dest: {879                            "Id": notif["Id"],880                            dest: notif[dest],881                            "Event": notif["Event"],882                            "Filter": notif["Filter"],883                        }884                    }885                    result += xmltodict.unparse(dest_dict, full_document=False)886    result += "</NotificationConfiguration>"887    response._content = result888    return response889def _validate_filter_rules(filter_doc):890    rules = filter_doc.get("FilterRule")891    if not rules:892        return893    for rule in rules:894        name = rule.get("Name", "")895        if name.lower() not in ["suffix", "prefix"]:896            raise InvalidFilterRuleName(name)897        # TODO: check what other rules there are898def _sanitize_notification_filter_rules(filter_doc):899    rules = filter_doc.get("FilterRule")900    if not rules:901        return902    for rule in rules:903        name = rule.get("Name", "")904        if name.lower() not in ["suffix", "prefix"]:905            raise InvalidFilterRuleName(name)906        rule["Name"] = name.title()907def handle_put_bucket_notification(bucket, data):908    parsed = strip_xmlns(xmltodict.parse(data))909    notif_config = parsed.get("NotificationConfiguration")910    notifications = []911    for dest in NOTIFICATION_DESTINATION_TYPES:912        config = notif_config.get("%sConfiguration" % dest)913        configs = config if isinstance(config, list) else [config] if config else []914        for config in configs:915            events = config.get("Event")916            if isinstance(events, str):917                events = [events]918            event_filter = config.get("Filter", {})919            # make sure FilterRule is an array920            s3_filter = _get_s3_filter(event_filter)921            if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):922                s3_filter["FilterRule"] = [s3_filter["FilterRule"]]923            # make sure FilterRules are valid and sanitize if necessary924            _sanitize_notification_filter_rules(s3_filter)925            # create final details dict926            notification_details = {927                "Id": config.get("Id", str(uuid.uuid4())),928                "Event": events,929                dest: config.get(dest),930                "Filter": event_filter,931            }932            notifications.append(clone(notification_details))933    S3_NOTIFICATIONS[bucket] = notifications934    return empty_response()935def remove_bucket_notification(bucket):936    if bucket in S3_NOTIFICATIONS:937        del S3_NOTIFICATIONS[bucket]938class ProxyListenerS3(PersistingProxyListener):939    def api_name(self):940        return "s3"941    @staticmethod942    def is_s3_copy_request(headers, path):943        return "x-amz-copy-source" in headers or "x-amz-copy-source" in path944    @staticmethod945    def is_create_multipart_request(query):946        return query.startswith("uploads")947    @staticmethod948    def is_multipart_upload(query):949        return query.startswith("uploadId")950    @staticmethod951    def get_201_response(key, bucket_name):952        return """953                <PostResponse>954                    <Location>{protocol}://{host}/{encoded_key}</Location>955                    <Bucket>{bucket}</Bucket>956                    <Key>{key}</Key>957                    <ETag>{etag}</ETag>958                </PostResponse>959                """.format(960            protocol=get_service_protocol(),961            host=config.HOSTNAME_EXTERNAL,962            encoded_key=quote(key, safe=""),963            key=key,964            bucket=bucket_name,965            etag="d41d8cd98f00b204e9800998ecf8427f",966        )967    @staticmethod968    def _update_location(content, bucket_name):969        bucket_name = normalize_bucket_name(bucket_name)970        host = config.HOSTNAME_EXTERNAL971        if ":" not in host:972            host = f"{host}:{config.service_port('s3')}"973        return re.sub(974            r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",975            r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),976            content,977            flags=re.MULTILINE,978        )979    @staticmethod980    def is_query_allowable(method, query):981        # Generally if there is a query (some/path/with?query) we don't want to send notifications982        if not query:983            return True984        # Except we do want to notify on multipart and presigned url upload completion985        contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query986        contains_key = "AWSAccessKeyId" in query and "Signature" in query987        # nodejs sdk putObjectCommand is adding x-id=putobject in the query988        allowed_query = "x-id=" in query.lower()989        if (990            (method == "POST" and query.startswith("uploadId"))991            or contains_cred992            or contains_key993            or allowed_query994        ):995            return True996    @staticmethod997    def parse_policy_expiration_date(expiration_string):998        try:999            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)1000        except Exception:1001            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)1002        # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object1003        dt = dt.replace(tzinfo=datetime.timezone.utc)1004        return dt1005    def forward_request(self, method, path, data, headers):1006        # Create list of query parameteres from the url1007        parsed = urlparse("{}{}".format(config.get_edge_url(), path))1008        query_params = parse_qs(parsed.query)1009        path_orig = path1010        path = path.replace(1011            "#", "%23"1012        )  # support key names containing hashes (e.g., required by Amplify)1013        # extracting bucket name from the request1014        parsed_path = urlparse(path)1015        bucket_name = extract_bucket_name(headers, parsed_path.path)1016        if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1017            if len(parsed_path.path) <= 1:1018                return error_response(1019                    "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1020                    + "configured to use path style addressing, or send a valid "1021                    + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1022                    "InvalidBucketName",1023                    status_code=400,1024                )1025            return error_response(1026                "The specified bucket is not valid.",1027                "InvalidBucketName",1028                status_code=400,1029            )1030        # Detecting pre-sign url and checking signature1031        if any(p in query_params for p in SIGNATURE_V2_PARAMS) or any(1032            p in query_params for p in SIGNATURE_V4_PARAMS1033        ):1034            response = authenticate_presign_url(1035                method=method, path=path, data=data, headers=headers1036            )1037            if response is not None:1038                return response1039        # handling s3 website hosting requests1040        if is_static_website(headers) and method == "GET":1041            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1042        # check content md5 hash integrity if not a copy request or multipart initialization1043        if (1044            "Content-MD5" in headers1045            and not self.is_s3_copy_request(headers, path)1046            and not self.is_create_multipart_request(parsed_path.query)1047        ):1048            response = check_content_md5(data, headers)1049            if response is not None:1050                return response1051        modified_data = None1052        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11053        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1054        to_find2 = to_bytes("<CreateBucketConfiguration")1055        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1056            # Note: with the latest version, <CreateBucketConfiguration> must either1057            # contain a valid <LocationConstraint>, or not be present at all in the body.1058            modified_data = b""1059        # POST requests to S3 may include a "${filename}" placeholder in the1060        # key, which should be replaced with an actual file name before storing.1061        if method == "POST":1062            original_data = not_none_or(modified_data, data)1063            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1064            if expanded_data is not original_data:1065                modified_data = expanded_data1066        # If no content-type is provided, 'binary/octet-stream' should be used1067        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1068        if method == "PUT" and not headers.get("content-type"):1069            headers["content-type"] = "binary/octet-stream"1070        # parse query params1071        query = parsed_path.query1072        path = parsed_path.path1073        query_map = parse_qs(query, keep_blank_values=True)1074        # remap metadata query params (not supported in moto) to request headers1075        append_metadata_headers(method, query_map, headers)1076        # apply fixes1077        headers_changed = fix_metadata_key_underscores(request_headers=headers)1078        if query == "notification" or "notification" in query_map:1079            # handle and return response for ?notification request1080            response = handle_notification_request(bucket_name, method, data)1081            return response1082        # if the Expires key in the url is already expired then return error1083        if method == "GET" and "Expires" in query_map:1084            ts = datetime.datetime.fromtimestamp(1085                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1086            )1087            if is_expired(ts):1088                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1089        # If multipart POST with policy in the params, return error if the policy has expired1090        if method == "POST":1091            policy_key, policy_value = multipart_content.find_multipart_key_value(1092                data, headers, "policy"1093            )1094            if policy_key and policy_value:1095                policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1096                expiration_string = policy.get("expiration", None)  # Example: 2020-06-05T13:37:12Z1097                if expiration_string:1098                    expiration_datetime = self.parse_policy_expiration_date(expiration_string)1099                    if is_expired(expiration_datetime):1100                        return token_expired_error(path, headers.get("x-amz-request-id"), 400)1101        if query == "cors" or "cors" in query_map:1102            if method == "GET":1103                return get_cors(bucket_name)1104            if method == "PUT":1105                return set_cors(bucket_name, data)1106            if method == "DELETE":1107                return delete_cors(bucket_name)1108        if query == "requestPayment" or "requestPayment" in query_map:1109            if method == "GET":1110                return get_request_payment(bucket_name)1111            if method == "PUT":1112                return set_request_payment(bucket_name, data)1113        if query == "lifecycle" or "lifecycle" in query_map:1114            if method == "GET":1115                return get_lifecycle(bucket_name)1116            if method == "PUT":1117                return set_lifecycle(bucket_name, data)1118            if method == "DELETE":1119                delete_lifecycle(bucket_name)1120        if query == "replication" or "replication" in query_map:1121            if method == "GET":1122                return get_replication(bucket_name)1123            if method == "PUT":1124                return set_replication(bucket_name, data)1125        if method == "DELETE" and validate_bucket_name(bucket_name):1126            delete_lifecycle(bucket_name)1127        path_orig_escaped = path_orig.replace("#", "%23")1128        if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1129            data_to_return = not_none_or(modified_data, data)1130            if modified_data is not None:1131                headers["Content-Length"] = str(len(data_to_return or ""))1132            return Request(1133                url=path_orig_escaped,1134                data=data_to_return,1135                headers=headers,1136                method=method,1137            )1138        return True1139    def return_response(self, method, path, data, headers, response):1140        path = to_str(path)1141        method = to_str(method)1142        path = path.replace("#", "%23")1143        # persist this API call to disk1144        super(ProxyListenerS3, self).return_response(method, path, data, headers, response)1145        bucket_name = extract_bucket_name(headers, path)1146        # POST requests to S3 may include a success_action_redirect or1147        # success_action_status field, which should be used to redirect a1148        # client to a new location.1149        key = None1150        if method == "POST":1151            key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1152            if key and redirect_url:1153                response.status_code = 3031154                response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1155                LOGGER.debug(1156                    "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1157                )1158            expanded_data = multipart_content.expand_multipart_filename(data, headers)1159            key, status_code = multipart_content.find_multipart_key_value(1160                expanded_data, headers, "success_action_status"1161            )1162            if response.status_code == 201 and key:1163                response._content = self.get_201_response(key, bucket_name)1164                response.headers["Content-Length"] = str(len(response._content or ""))1165                response.headers["Content-Type"] = "application/xml; charset=utf-8"1166                return response1167        if response.status_code == 416:1168            if method == "GET":1169                return error_response(1170                    "The requested range cannot be satisfied.", "InvalidRange", 4161171                )1172            elif method == "HEAD":1173                response.status_code = 2001174                return response1175        parsed = urlparse(path)1176        bucket_name_in_host = uses_host_addressing(headers)1177        should_send_notifications = all(1178            [1179                method in ("PUT", "POST", "DELETE"),1180                "/" in path[1:] or bucket_name_in_host or key,1181                # check if this is an actual put object request, because it could also be1182                # a put bucket request with a path like this: /bucket_name/1183                bucket_name_in_host1184                or key1185                or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1186                self.is_query_allowable(method, parsed.query),1187            ]1188        )1189        # get subscribers and send bucket notifications1190        if should_send_notifications:1191            # if we already have a good key, use it, otherwise examine the path1192            if key:1193                object_path = "/" + key1194            elif bucket_name_in_host:1195                object_path = parsed.path1196            else:1197                parts = parsed.path[1:].split("/", 1)1198                object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1199            version_id = response.headers.get("x-amz-version-id", None)1200            send_notifications(method, bucket_name, object_path, version_id, headers)1201        # publish event for creation/deletion of buckets:1202        if method in ("PUT", "DELETE") and (1203            "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01204        ):1205            event_type = (1206                event_publisher.EVENT_S3_CREATE_BUCKET1207                if method == "PUT"1208                else event_publisher.EVENT_S3_DELETE_BUCKET1209            )1210            event_publisher.fire_event(1211                event_type, payload={"n": event_publisher.get_hash(bucket_name)}1212            )1213        # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1214        if method == "PUT":1215            if parsed.query == "policy":1216                response._content = ""1217                response.status_code = 2041218                return response1219            # when creating s3 bucket using aws s3api the return header contains 'Location' param1220            if key is None:1221                # if the bucket is created in 'us-east-1' the location header contains bucket as path1222                # else the the header contains bucket url1223                if aws_stack.get_region() == "us-east-1":1224                    response.headers["Location"] = "/{}".format(bucket_name)1225                else:1226                    # Note: we need to set the correct protocol here1227                    protocol = (1228                        headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1229                        or "http"1230                    )1231                    response.headers["Location"] = "{}://{}.{}:{}/".format(1232                        protocol,1233                        bucket_name,1234                        constants.S3_VIRTUAL_HOSTNAME,1235                        config.EDGE_PORT,1236                    )1237        if response is not None:1238            reset_content_length = False1239            # append CORS headers and other annotations/patches to response1240            append_cors_headers(1241                bucket_name,1242                request_method=method,1243                request_headers=headers,1244                response=response,1245            )1246            append_last_modified_headers(response=response)1247            append_list_objects_marker(method, path, data, response)1248            fix_range_content_type(bucket_name, path, headers, response)1249            fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1250            fix_metadata_key_underscores(response=response)1251            fix_creation_date(method, path, response=response)1252            ret304_on_etag(data, headers, response)1253            append_aws_request_troubleshooting_headers(response)1254            fix_delimiter(response)1255            fix_xml_preamble_newline(method, path, headers, response)1256            if method == "PUT":1257                set_object_expiry(path, headers)1258            # Remove body from PUT response on presigned URL1259            # https://github.com/localstack/localstack/issues/13171260            if (1261                method == "PUT"1262                and int(response.status_code) < 4001263                and (1264                    "X-Amz-Security-Token=" in path1265                    or "X-Amz-Credential=" in path1266                    or "AWSAccessKeyId=" in path1267                )1268            ):1269                response._content = ""...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
