Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...600            xml_prefix, delimiter = xml_prefix.encode(), delimiter.encode()601            pattern = pattern.encode()602        if c.startswith(xml_prefix):603            response._content = re.compile(pattern).sub(delimiter, c)604def convert_to_chunked_encoding(method, path, response):605    if method != "GET" or path != "/":606        return607    if response.headers.get("Transfer-Encoding", "").lower() == "chunked":608        return609    response.headers["Transfer-Encoding"] = "chunked"610    response.headers.pop("Content-Encoding", None)611    response.headers.pop("Content-Length", None)612def unquote(s):613    if (s[0], s[-1]) in (('"', '"'), ("'", "'")):614        return s[1:-1]615    return s616def ret304_on_etag(data, headers, response):617    etag = response.headers.get("ETag")618    if etag:619        match = headers.get("If-None-Match")620        if match and unquote(match) == unquote(etag):621            response.status_code = 304622            response._content = ""623def fix_etag_for_multipart(data, headers, response):624    # Fix for https://github.com/localstack/localstack/issues/1978625    if headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD:626        try:627            if b"chunk-signature=" not in to_bytes(data):628                return629            correct_hash = md5(strip_chunk_signatures(data))630            tags = r"<ETag>%s</ETag>"631            pattern = r"(")?([^<&]+)(")?"632            replacement = r"\g<1>%s\g<3>" % correct_hash633            response._content = re.sub(tags % pattern, tags % replacement, to_str(response.content))634            if response.headers.get("ETag"):635                response.headers["ETag"] = re.sub(pattern, replacement, response.headers["ETag"])636        except Exception:637            pass638def remove_xml_preamble(response):639    """Removes <?xml ... ?> from a response content"""640    response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))641# --------------642# HELPER METHODS643#   for lifecycle/replication/...644# --------------645def get_lifecycle(bucket_name):646    bucket_name = normalize_bucket_name(bucket_name)647    exists, code, body = is_bucket_available(bucket_name)648    if not exists:649        return xml_response(body, status_code=code)650    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)651    status_code = 200652    if not lifecycle:653        lifecycle = {654            "Error": {655                "Code": "NoSuchLifecycleConfiguration",656                "Message": "The lifecycle configuration does not exist",657                "BucketName": bucket_name,658            }659        }660        status_code = 404661    body = xmltodict.unparse(lifecycle)662    return xml_response(body, status_code=status_code)663def get_replication(bucket_name):664    bucket_name = normalize_bucket_name(bucket_name)665    exists, code, body = is_bucket_available(bucket_name)666    if not exists:667        return xml_response(body, status_code=code)668    replication = BUCKET_REPLICATIONS.get(bucket_name)669    status_code = 200670    if not replication:671        replication = {672            "Error": {673                "Code": "ReplicationConfigurationNotFoundError",674                "Message": "The replication configuration was not found",675                "BucketName": bucket_name,676            }677        }678        status_code = 404679    body = xmltodict.unparse(replication)680    return xml_response(body, status_code=status_code)681def set_lifecycle(bucket_name, lifecycle):682    bucket_name = normalize_bucket_name(bucket_name)683    exists, code, body = is_bucket_available(bucket_name)684    if not exists:685        return xml_response(body, status_code=code)686    if isinstance(to_str(lifecycle), six.string_types):687        lifecycle = xmltodict.parse(lifecycle)688    BUCKET_LIFECYCLE[bucket_name] = lifecycle689    return 200690def delete_lifecycle(bucket_name):691    bucket_name = normalize_bucket_name(bucket_name)692    exists, code, body = is_bucket_available(bucket_name)693    if not exists:694        return xml_response(body, status_code=code)695    if BUCKET_LIFECYCLE.get(bucket_name):696        BUCKET_LIFECYCLE.pop(bucket_name)697def set_replication(bucket_name, replication):698    bucket_name = normalize_bucket_name(bucket_name)699    exists, code, body = is_bucket_available(bucket_name)700    if not exists:701        return xml_response(body, status_code=code)702    if isinstance(to_str(replication), six.string_types):703        replication = xmltodict.parse(replication)704    BUCKET_REPLICATIONS[bucket_name] = replication705    return 200706# -------------707# UTIL METHODS708# -------------709def strip_chunk_signatures(data):710    # For clients that use streaming v4 authentication, the request contains chunk signatures711    # in the HTTP body (see example below) which we need to strip as moto cannot handle them712    #713    # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921714    # <payload data ...>715    # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70716    data_new = ""717    if data is not None:718        data_new = re.sub(719            b"(^|\r\n)[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}(\r\n)(\r\n$)?",720            b"",721            to_bytes(data),722            flags=re.MULTILINE | re.DOTALL,723        )724    return data_new725def is_bucket_available(bucket_name):726    body = {"Code": "200"}727    exists, code = bucket_exists(bucket_name)728    if not exists:729        body = {730            "Error": {731                "Code": code,732                "Message": "The bucket does not exist",733                "BucketName": bucket_name,734            }735        }736        return exists, code, body737    return True, 200, body738def bucket_exists(bucket_name):739    """Tests for the existence of the specified bucket. Returns the error code740    if the bucket does not exist (200 if the bucket does exist).741    """742    bucket_name = normalize_bucket_name(bucket_name)743    s3_client = aws_stack.connect_to_service("s3")744    try:745        s3_client.head_bucket(Bucket=bucket_name)746    except ClientError as err:747        error_code = err.response.get("Error").get("Code")748        return False, error_code749    return True, 200750def check_content_md5(data, headers):751    actual = md5(strip_chunk_signatures(data))752    try:753        md5_header = headers["Content-MD5"]754        if not is_base64(md5_header):755            raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)756        expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))757    except Exception:758        return error_response(759            "The Content-MD5 you specified is not valid.",760            "InvalidDigest",761            status_code=400,762        )763    if actual != expected:764        return error_response(765            "The Content-MD5 you specified did not match what we received.",766            "BadDigest",767            status_code=400,768        )769def error_response(message, code, status_code=400):770    result = {"Error": {"Code": code, "Message": message}}771    content = xmltodict.unparse(result)772    return xml_response(content, status_code=status_code)773def xml_response(content, status_code=200):774    headers = {"Content-Type": "application/xml"}775    return requests_response(content, status_code=status_code, headers=headers)776def no_such_key_error(resource, requestId=None, status_code=400):777    result = {778        "Error": {779            "Code": "NoSuchKey",780            "Message": "The resource you requested does not exist",781            "Resource": resource,782            "RequestId": requestId,783        }784    }785    content = xmltodict.unparse(result)786    return xml_response(content, status_code=status_code)787def no_such_bucket(bucket_name, requestId=None, status_code=404):788    # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists789    result = {790        "Error": {791            "Code": "NoSuchBucket",792            "Message": "The specified bucket does not exist",793            "BucketName": bucket_name,794            "RequestId": requestId,795            "HostId": short_uid(),796        }797    }798    content = xmltodict.unparse(result)799    return xml_response(content, status_code=status_code)800def token_expired_error(resource, requestId=None, status_code=400):801    result = {802        "Error": {803            "Code": "ExpiredToken",804            "Message": "The provided token has expired.",805            "Resource": resource,806            "RequestId": requestId,807        }808    }809    content = xmltodict.unparse(result)810    return xml_response(content, status_code=status_code)811def expand_redirect_url(starting_url, key, bucket):812    """Add key and bucket parameters to starting URL query string."""813    parsed = urlparse.urlparse(starting_url)814    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))815    query.update([("key", key), ("bucket", bucket)])816    redirect_url = urlparse.urlunparse(817        (818            parsed.scheme,819            parsed.netloc,820            parsed.path,821            parsed.params,822            urlparse.urlencode(query),823            None,824        )825    )826    return redirect_url827def is_bucket_specified_in_domain_name(path, headers):828    host = headers.get("host", "")829    return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)830def is_object_specific_request(path, headers):831    """Return whether the given request is specific to a certain S3 object.832    Note: the bucket name is usually specified as a path parameter,833    but may also be part of the domain name!"""834    bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)835    parts = len(path.split("/"))836    return parts > (1 if bucket_in_domain else 2)837def empty_response():838    response = Response()839    response.status_code = 200840    response._content = ""841    return response842def handle_notification_request(bucket, method, data):843    if method == "GET":844        return handle_get_bucket_notification(bucket)845    if method == "PUT":846        return handle_put_bucket_notification(bucket, data)847    return empty_response()848def handle_get_bucket_notification(bucket):849    response = Response()850    response.status_code = 200851    response._content = ""852    # TODO check if bucket exists853    result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3854    if bucket in S3_NOTIFICATIONS:855        notifs = S3_NOTIFICATIONS[bucket]856        for notif in notifs:857            for dest in NOTIFICATION_DESTINATION_TYPES:858                if dest in notif:859                    dest_dict = {860                        "%sConfiguration"861                        % dest: {862                            "Id": notif["Id"],863                            dest: notif[dest],864                            "Event": notif["Event"],865                            "Filter": notif["Filter"],866                        }867                    }868                    result += xmltodict.unparse(dest_dict, full_document=False)869    result += "</NotificationConfiguration>"870    response._content = result871    return response872def _validate_filter_rules(filter_doc):873    rules = filter_doc.get("FilterRule")874    if not rules:875        return876    for rule in rules:877        name = rule.get("Name", "")878        if name.lower() not in ["suffix", "prefix"]:879            raise InvalidFilterRuleName(name)880        # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882    rules = filter_doc.get("FilterRule")883    if not rules:884        return885    for rule in rules:886        name = rule.get("Name", "")887        if name.lower() not in ["suffix", "prefix"]:888            raise InvalidFilterRuleName(name)889        rule["Name"] = name.title()890def handle_put_bucket_notification(bucket, data):891    parsed = xmltodict.parse(data)892    notif_config = parsed.get("NotificationConfiguration")893    notifications = []894    for dest in NOTIFICATION_DESTINATION_TYPES:895        config = notif_config.get("%sConfiguration" % dest)896        configs = config if isinstance(config, list) else [config] if config else []897        for config in configs:898            events = config.get("Event")899            if isinstance(events, six.string_types):900                events = [events]901            event_filter = config.get("Filter", {})902            # make sure FilterRule is an array903            s3_filter = _get_s3_filter(event_filter)904            if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):905                s3_filter["FilterRule"] = [s3_filter["FilterRule"]]906            # make sure FilterRules are valid and sanitize if necessary907            _sanitize_notification_filter_rules(s3_filter)908            # create final details dict909            notification_details = {910                "Id": config.get("Id", str(uuid.uuid4())),911                "Event": events,912                dest: config.get(dest),913                "Filter": event_filter,914            }915            notifications.append(clone(notification_details))916    S3_NOTIFICATIONS[bucket] = notifications917    return empty_response()918def remove_bucket_notification(bucket):919    if bucket in S3_NOTIFICATIONS:920        del S3_NOTIFICATIONS[bucket]921class ProxyListenerS3(PersistingProxyListener):922    def api_name(self):923        return "s3"924    @staticmethod925    def is_s3_copy_request(headers, path):926        return "x-amz-copy-source" in headers or "x-amz-copy-source" in path927    @staticmethod928    def is_create_multipart_request(query):929        return query.startswith("uploads")930    @staticmethod931    def is_multipart_upload(query):932        return query.startswith("uploadId")933    @staticmethod934    def get_201_response(key, bucket_name):935        return """936                <PostResponse>937                    <Location>{protocol}://{host}/{encoded_key}</Location>938                    <Bucket>{bucket}</Bucket>939                    <Key>{key}</Key>940                    <ETag>{etag}</ETag>941                </PostResponse>942                """.format(943            protocol=get_service_protocol(),944            host=config.HOSTNAME_EXTERNAL,945            encoded_key=urlparse.quote(key, safe=""),946            key=key,947            bucket=bucket_name,948            etag="d41d8cd98f00b204e9800998ecf8427f",949        )950    @staticmethod951    def _update_location(content, bucket_name):952        bucket_name = normalize_bucket_name(bucket_name)953        host = config.HOSTNAME_EXTERNAL954        if ":" not in host:955            host = "%s:%s" % (host, config.PORT_S3)956        return re.sub(957            r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958            r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959            content,960            flags=re.MULTILINE,961        )962    @staticmethod963    def is_query_allowable(method, query):964        # Generally if there is a query (some/path/with?query) we don't want to send notifications965        if not query:966            return True967        # Except we do want to notify on multipart and presigned url upload completion968        contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969        contains_key = "AWSAccessKeyId" in query and "Signature" in query970        # nodejs sdk putObjectCommand is adding x-id=putobject in the query971        allowed_query = "x-id=" in query.lower()972        if (973            (method == "POST" and query.startswith("uploadId"))974            or contains_cred975            or contains_key976            or allowed_query977        ):978            return True979    @staticmethod980    def parse_policy_expiration_date(expiration_string):981        try:982            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983        except Exception:984            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985        # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986        dt = dt.replace(tzinfo=datetime.timezone.utc)987        return dt988    def forward_request(self, method, path, data, headers):989        # Create list of query parameteres from the url990        parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991        query_params = parse_qs(parsed.query)992        path_orig = path993        path = path.replace(994            "#", "%23"995        )  # support key names containing hashes (e.g., required by Amplify)996        # extracting bucket name from the request997        parsed_path = urlparse.urlparse(path)998        bucket_name = extract_bucket_name(headers, parsed_path.path)999        if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000            if len(parsed_path.path) <= 1:1001                return error_response(1002                    "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003                    + "configured to use path style addressing, or send a valid "1004                    + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005                    "InvalidBucketName",1006                    status_code=400,1007                )1008            return error_response(1009                "The specified bucket is not valid.",1010                "InvalidBucketName",1011                status_code=400,1012            )1013        # Detecting pre-sign url and checking signature1014        if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015            [p in query_params for p in SIGNATURE_V4_PARAMS]1016        ):1017            response = authenticate_presign_url(1018                method=method, path=path, data=data, headers=headers1019            )1020            if response is not None:1021                return response1022        # handling s3 website hosting requests1023        if is_static_website(headers) and method == "GET":1024            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025        # check content md5 hash integrity if not a copy request or multipart initialization1026        if (1027            "Content-MD5" in headers1028            and not self.is_s3_copy_request(headers, path)1029            and not self.is_create_multipart_request(parsed_path.query)1030        ):1031            response = check_content_md5(data, headers)1032            if response is not None:1033                return response1034        modified_data = None1035        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037        to_find2 = to_bytes("<CreateBucketConfiguration")1038        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039            # Note: with the latest version, <CreateBucketConfiguration> must either1040            # contain a valid <LocationConstraint>, or not be present at all in the body.1041            modified_data = b""1042        # If this request contains streaming v4 authentication signatures, strip them from the message1043        # Related isse: https://github.com/localstack/localstack/issues/981044        # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045        is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046        if is_streaming_payload:1047            modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048            headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049            headers.pop(CONTENT_SHA256_HEADER)1050        # POST requests to S3 may include a "${filename}" placeholder in the1051        # key, which should be replaced with an actual file name before storing.1052        if method == "POST":1053            original_data = not_none_or(modified_data, data)1054            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055            if expanded_data is not original_data:1056                modified_data = expanded_data1057        # If no content-type is provided, 'binary/octet-stream' should be used1058        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059        if method == "PUT" and not headers.get("content-type"):1060            headers["content-type"] = "binary/octet-stream"1061        # parse query params1062        query = parsed_path.query1063        path = parsed_path.path1064        query_map = urlparse.parse_qs(query, keep_blank_values=True)1065        # remap metadata query params (not supported in moto) to request headers1066        append_metadata_headers(method, query_map, headers)1067        # apply fixes1068        headers_changed = fix_metadata_key_underscores(request_headers=headers)1069        if query == "notification" or "notification" in query_map:1070            # handle and return response for ?notification request1071            response = handle_notification_request(bucket_name, method, data)1072            return response1073        # if the Expires key in the url is already expired then return error1074        if method == "GET" and "Expires" in query_map:1075            ts = datetime.datetime.fromtimestamp(1076                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077            )1078            if is_expired(ts):1079                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080        # If multipart POST with policy in the params, return error if the policy has expired1081        if method == "POST":1082            policy_key, policy_value = multipart_content.find_multipart_key_value(1083                data, headers, "policy"1084            )1085            if policy_key and policy_value:1086                policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087                expiration_string = policy.get("expiration", None)  # Example: 2020-06-05T13:37:12Z1088                if expiration_string:1089                    expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090                    if is_expired(expiration_datetime):1091                        return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092        if query == "cors" or "cors" in query_map:1093            if method == "GET":1094                return get_cors(bucket_name)1095            if method == "PUT":1096                return set_cors(bucket_name, data)1097            if method == "DELETE":1098                return delete_cors(bucket_name)1099        if query == "requestPayment" or "requestPayment" in query_map:1100            if method == "GET":1101                return get_request_payment(bucket_name)1102            if method == "PUT":1103                return set_request_payment(bucket_name, data)1104        if query == "lifecycle" or "lifecycle" in query_map:1105            if method == "GET":1106                return get_lifecycle(bucket_name)1107            if method == "PUT":1108                return set_lifecycle(bucket_name, data)1109            if method == "DELETE":1110                delete_lifecycle(bucket_name)1111        if query == "replication" or "replication" in query_map:1112            if method == "GET":1113                return get_replication(bucket_name)1114            if method == "PUT":1115                return set_replication(bucket_name, data)1116        if method == "DELETE" and validate_bucket_name(bucket_name):1117            delete_lifecycle(bucket_name)1118        path_orig_escaped = path_orig.replace("#", "%23")1119        if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120            data_to_return = not_none_or(modified_data, data)1121            if modified_data is not None:1122                headers["Content-Length"] = str(len(data_to_return or ""))1123            return Request(1124                url=path_orig_escaped,1125                data=data_to_return,1126                headers=headers,1127                method=method,1128            )1129        return True1130    def return_response(self, method, path, data, headers, response, request_handler=None):1131        path = to_str(path)1132        method = to_str(method)1133        path = path.replace("#", "%23")1134        # persist this API call to disk1135        super(ProxyListenerS3, self).return_response(1136            method, path, data, headers, response, request_handler1137        )1138        bucket_name = extract_bucket_name(headers, path)1139        # POST requests to S3 may include a success_action_redirect or1140        # success_action_status field, which should be used to redirect a1141        # client to a new location.1142        key = None1143        if method == "POST":1144            key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145            if key and redirect_url:1146                response.status_code = 3031147                response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148                LOGGER.debug(1149                    "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150                )1151            expanded_data = multipart_content.expand_multipart_filename(data, headers)1152            key, status_code = multipart_content.find_multipart_key_value(1153                expanded_data, headers, "success_action_status"1154            )1155            if response.status_code == 201 and key:1156                response._content = self.get_201_response(key, bucket_name)1157                response.headers["Content-Length"] = str(len(response._content or ""))1158                response.headers["Content-Type"] = "application/xml; charset=utf-8"1159                return response1160        if response.status_code == 416:1161            if method == "GET":1162                return error_response(1163                    "The requested range cannot be satisfied.", "InvalidRange", 4161164                )1165            elif method == "HEAD":1166                response.status_code = 2001167                return response1168        parsed = urlparse.urlparse(path)1169        bucket_name_in_host = uses_host_addressing(headers)1170        should_send_notifications = all(1171            [1172                method in ("PUT", "POST", "DELETE"),1173                "/" in path[1:] or bucket_name_in_host or key,1174                # check if this is an actual put object request, because it could also be1175                # a put bucket request with a path like this: /bucket_name/1176                bucket_name_in_host1177                or key1178                or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1179                self.is_query_allowable(method, parsed.query),1180            ]1181        )1182        # get subscribers and send bucket notifications1183        if should_send_notifications:1184            # if we already have a good key, use it, otherwise examine the path1185            if key:1186                object_path = "/" + key1187            elif bucket_name_in_host:1188                object_path = parsed.path1189            else:1190                parts = parsed.path[1:].split("/", 1)1191                object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1192            version_id = response.headers.get("x-amz-version-id", None)1193            send_notifications(method, bucket_name, object_path, version_id, headers)1194        # publish event for creation/deletion of buckets:1195        if method in ("PUT", "DELETE") and (1196            "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01197        ):1198            event_type = (1199                event_publisher.EVENT_S3_CREATE_BUCKET1200                if method == "PUT"1201                else event_publisher.EVENT_S3_DELETE_BUCKET1202            )1203            event_publisher.fire_event(1204                event_type, payload={"n": event_publisher.get_hash(bucket_name)}1205            )1206        # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1207        if method == "PUT":1208            if parsed.query == "policy":1209                response._content = ""1210                response.status_code = 2041211                return response1212            # when creating s3 bucket using aws s3api the return header contains 'Location' param1213            if key is None:1214                # if the bucket is created in 'us-east-1' the location header contains bucket as path1215                # else the the header contains bucket url1216                if aws_stack.get_region() == "us-east-1":1217                    response.headers["Location"] = "/{}".format(bucket_name)1218                else:1219                    # Note: we need to set the correct protocol here1220                    protocol = (1221                        headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1222                        or "http"1223                    )1224                    response.headers["Location"] = "{}://{}.{}:{}/".format(1225                        protocol,1226                        bucket_name,1227                        constants.S3_VIRTUAL_HOSTNAME,1228                        config.EDGE_PORT,1229                    )1230        if response is not None:1231            reset_content_length = False1232            # append CORS headers and other annotations/patches to response1233            append_cors_headers(1234                bucket_name,1235                request_method=method,1236                request_headers=headers,1237                response=response,1238            )1239            append_last_modified_headers(response=response)1240            append_list_objects_marker(method, path, data, response)1241            fix_location_constraint(response)1242            fix_range_content_type(bucket_name, path, headers, response)1243            fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1244            fix_metadata_key_underscores(response=response)1245            fix_creation_date(method, path, response=response)1246            fix_etag_for_multipart(data, headers, response)1247            ret304_on_etag(data, headers, response)1248            append_aws_request_troubleshooting_headers(response)1249            fix_delimiter(data, headers, response)1250            if method == "PUT":1251                set_object_expiry(path, headers)1252            # Remove body from PUT response on presigned URL1253            # https://github.com/localstack/localstack/issues/13171254            if (1255                method == "PUT"1256                and int(response.status_code) < 4001257                and (1258                    "X-Amz-Security-Token=" in path1259                    or "X-Amz-Credential=" in path1260                    or "AWSAccessKeyId=" in path1261                )1262            ):1263                response._content = ""1264                reset_content_length = True1265            response_content_str = None1266            try:1267                response_content_str = to_str(response._content)1268            except Exception:1269                pass1270            # Honor response header overrides1271            # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1272            if method == "GET":1273                add_accept_range_header(response)1274                add_response_metadata_headers(response)1275                if is_object_expired(path):1276                    return no_such_key_error(path, headers.get("x-amz-request-id"), 400)1277                # AWS C# SDK uses get bucket acl to check the existence of the bucket1278                # If not exists, raises a NoSuchBucket Error1279                if bucket_name and "/?acl" in path:1280                    exists, code, body = is_bucket_available(bucket_name)1281                    if not exists:1282                        return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1283                query_map = urlparse.parse_qs(parsed.query, keep_blank_values=True)1284                for param_name, header_name in ALLOWED_HEADER_OVERRIDES.items():1285                    if param_name in query_map:1286                        response.headers[header_name] = query_map[param_name][0]1287            if response_content_str and response_content_str.startswith("<"):1288                is_bytes = isinstance(response._content, six.binary_type)1289                response._content = response_content_str1290                append_last_modified_headers(response=response, content=response_content_str)1291                # We need to un-pretty-print the XML, otherwise we run into this issue with Spark:1292                # https://github.com/jserver/mock-s3/pull/9/files1293                # https://github.com/localstack/localstack/issues/1831294                # Note: yet, we need to make sure we have a newline after the first line: <?xml ...>\n1295                # Note: make sure to return XML docs verbatim: https://github.com/localstack/localstack/issues/10371296                if method != "GET" or not is_object_specific_request(path, headers):1297                    response._content = re.sub(1298                        r"([^\?])>\n\s*<",1299                        r"\1><",1300                        response_content_str,1301                        flags=re.MULTILINE,1302                    )1303                # update Location information in response payload1304                response._content = self._update_location(response._content, bucket_name)1305                # convert back to bytes1306                if is_bytes:1307                    response._content = to_bytes(response._content)1308                # fix content-type: https://github.com/localstack/localstack/issues/6181309                #                   https://github.com/localstack/localstack/issues/5491310                #                   https://github.com/localstack/localstack/issues/8541311                if "text/html" in response.headers.get(1312                    "Content-Type", ""1313                ) and not response_content_str.lower().startswith("<!doctype html"):1314                    response.headers["Content-Type"] = "application/xml; charset=utf-8"1315                reset_content_length = True1316            # update Content-Length headers (fix https://github.com/localstack/localstack/issues/541)1317            if method == "DELETE":1318                reset_content_length = True1319            if reset_content_length:1320                response.headers["Content-Length"] = str(len(response._content or ""))1321            # convert to chunked encoding, for compatibility with certain SDKs (e.g., AWS PHP SDK)1322            convert_to_chunked_encoding(method, path, response)1323def serve_static_website(headers, path, bucket_name):1324    s3_client = aws_stack.connect_to_service("s3")1325    # check if bucket exists1326    try:1327        s3_client.head_bucket(Bucket=bucket_name)1328    except ClientError:1329        return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1330    def respond_with_key(status_code, key):1331        obj = s3_client.get_object(Bucket=bucket_name, Key=key)1332        response_headers = {}1333        if "if-none-match" in headers and "ETag" in obj and obj["ETag"] in headers["if-none-match"]:1334            return requests_response(status_code=304, content="", headers=response_headers)1335        if "WebsiteRedirectLocation" in obj:1336            response_headers["location"] = obj["WebsiteRedirectLocation"]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
