Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...574        if response.headers.get("Last-Modified", "") == "":575            response.headers["Last-Modified"] = datetime.datetime.now().strftime(time_format)576    except Exception as err:577        LOGGER.error("Caught generic exception (setting LastModified header): %s", err)578def fix_list_objects_response(method, path, data, response):579    content = response.content or b""580    if b"<ListBucketResult" not in to_bytes(content):581        return582    content = to_str(content)583    parsed = urlparse(path)584    query_map = parse_qs(parsed.query)585    # insert <Marker> element into response586    if "<Marker>" not in content:587        marker = ""588        if query_map.get("marker"):589            marker = query_map.get("marker")[0]590        insert = "<Marker>%s</Marker>" % marker591        content = content.replace("</ListBucketResult>", f"{insert}</ListBucketResult>")592    # insert <EncodingType> element into response593    encoding_type = query_map.get("encoding-type")594    if "<EncodingType>" not in content and encoding_type:595        insert = f"<EncodingType>{encoding_type[0]}</EncodingType>"596        content = content.replace("</ListBucketResult>", f"{insert}</ListBucketResult>")597    # fix URL-encoding of <Delimiter> response element598    if "<Delimiter>" in content:599        regex = "<Delimiter>([^<]+)</Delimiter>"600        delimiter = re.search(regex, content).group(1).strip()601        if delimiter != "/":602            content = re.sub(regex, f"<Delimiter>{quote(delimiter)}</Delimiter>", content)603    response._content = content604    response.headers.pop("Content-Length", None)605def append_metadata_headers(method, query_map, headers):606    for key, value in query_map.items():607        if key.lower().startswith(OBJECT_METADATA_KEY_PREFIX):608            if headers.get(key) is None:609                headers[key] = value[0]610def fix_range_content_type(bucket_name, path, headers, response):611    # Fix content type for Range requests - https://github.com/localstack/localstack/issues/1259612    if "Range" not in headers:613        return614    if response.status_code >= 400:615        return616    s3_client = aws_stack.connect_to_service("s3")617    path = urlparse(unquote(path)).path618    key_name = extract_key_name(headers, path)619    result = s3_client.head_object(Bucket=bucket_name, Key=key_name)620    content_type = result["ContentType"]621    if response.headers.get("Content-Type") == "text/html; charset=utf-8":622        response.headers["Content-Type"] = content_type623def fix_delete_objects_response(bucket_name, method, parsed_path, data, headers, response):624    # Deleting non-existing keys should not result in errors.625    # Fixes https://github.com/localstack/localstack/issues/1893626    if not (method == "POST" and parsed_path.query == "delete" and "<Delete" in to_str(data or "")):627        return628    content = to_str(response._content)629    if "<Error>" not in content:630        return631    result = xmltodict.parse(content).get("DeleteResult")632    # can be NoSuchBucket error633    if not result:634        return635    errors = result.get("Error")636    errors = errors if isinstance(errors, list) else [errors]637    deleted = result.get("Deleted")638    if not isinstance(result.get("Deleted"), list):639        deleted = result["Deleted"] = [deleted] if deleted else []640    for entry in list(errors):641        if set(entry.keys()) == set(["Key"]):642            errors.remove(entry)643            deleted.append(entry)644    if not errors:645        result.pop("Error")646    response._content = xmltodict.unparse({"DeleteResult": result})647def fix_metadata_key_underscores(request_headers=None, response=None):648    if request_headers is None:649        request_headers = {}650    # fix for https://github.com/localstack/localstack/issues/1790651    underscore_replacement = "---"652    meta_header_prefix = "x-amz-meta-"653    prefix_len = len(meta_header_prefix)654    updated = False655    for key in list(request_headers.keys()):656        if key.lower().startswith(meta_header_prefix):657            key_new = meta_header_prefix + key[prefix_len:].replace("_", underscore_replacement)658            if key != key_new:659                request_headers[key_new] = request_headers.pop(key)660                updated = True661    if response is not None:662        for key in list(response.headers.keys()):663            if key.lower().startswith(meta_header_prefix):664                key_new = meta_header_prefix + key[prefix_len:].replace(underscore_replacement, "_")665                if key != key_new:666                    response.headers[key_new] = response.headers.pop(key)667    return updated668def fix_creation_date(method, path, response):669    if method != "GET" or path != "/":670        return671    response._content = re.sub(672        r"(\.[0-9]+)(\+00:00)?</CreationDate>",673        r"\1Z</CreationDate>",674        to_str(response._content),675    )676def replace_in_xml_response(response, search: str, replace: str):677    if response.status_code != 200 or not response._content:678        return679    c, xml_prefix = response._content, "<?xml"680    if isinstance(c, bytes):681        xml_prefix, search, replace = xml_prefix.encode(), search.encode(), replace.encode()682    if c.startswith(xml_prefix):683        response._content = re.compile(search).sub(replace, c)684def fix_delimiter(response):685    replace_in_xml_response(response, "<Delimiter>None<", "<Delimiter><")686def fix_xml_preamble_newline(method, path, headers, response):687    # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint688    # this is required because upstream moto is generally collapsing all S3 XML responses:689    # https://github.com/spulec/moto/blob/3718cde444b3e0117072c29b087237e1787c3a66/moto/core/responses.py#L102-L104690    if is_object_download_request(method, path, headers):691        return692    replace_in_xml_response(response, r"(<\?xml [^>]+>)<", r"\1\n<")693def convert_to_chunked_encoding(method, path, response):694    if method != "GET" or path != "/":695        return696    if response.headers.get("Transfer-Encoding", "").lower() == "chunked":697        return698    response.headers["Transfer-Encoding"] = "chunked"699    response.headers.pop("Content-Encoding", None)700    response.headers.pop("Content-Length", None)701def strip_surrounding_quotes(s):702    if (s[0], s[-1]) in (('"', '"'), ("'", "'")):703        return s[1:-1]704    return s705def ret304_on_etag(data, headers, response):706    etag = response.headers.get("ETag")707    if etag:708        match = headers.get("If-None-Match")709        if match and strip_surrounding_quotes(match) == strip_surrounding_quotes(etag):710            response.status_code = 304711            response._content = ""712def remove_xml_preamble(response):713    """Removes <?xml ... ?> from a response content"""714    response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))715# --------------716# HELPER METHODS717#   for lifecycle/replication/...718# --------------719def get_lifecycle(bucket_name):720    bucket_name = normalize_bucket_name(bucket_name)721    exists, code, body = is_bucket_available(bucket_name)722    if not exists:723        return xml_response(body, status_code=code)724    lifecycle = BackendState.lifecycle_config(bucket_name)725    status_code = 200726    if not lifecycle:727        lifecycle = {728            "Error": {729                "Code": "NoSuchLifecycleConfiguration",730                "Message": "The lifecycle configuration does not exist",731                "BucketName": bucket_name,732            }733        }734        status_code = 404735    body = xmltodict.unparse(lifecycle)736    return xml_response(body, status_code=status_code)737def get_replication(bucket_name):738    bucket_name = normalize_bucket_name(bucket_name)739    exists, code, body = is_bucket_available(bucket_name)740    if not exists:741        return xml_response(body, status_code=code)742    replication = BackendState.replication_config(bucket_name)743    status_code = 200744    if not replication:745        replication = {746            "Error": {747                "Code": "ReplicationConfigurationNotFoundError",748                "Message": "The replication configuration was not found",749                "BucketName": bucket_name,750            }751        }752        status_code = 404753    body = xmltodict.unparse(replication)754    return xml_response(body, status_code=status_code)755def set_lifecycle(bucket_name, lifecycle):756    bucket_name = normalize_bucket_name(bucket_name)757    exists, code, body = is_bucket_available(bucket_name)758    if not exists:759        return xml_response(body, status_code=code)760    if isinstance(to_str(lifecycle), str):761        lifecycle = xmltodict.parse(lifecycle)762    bucket_lifecycle = BackendState.lifecycle_config(bucket_name)763    bucket_lifecycle.clear()764    bucket_lifecycle.update(lifecycle)765    return 200766def delete_lifecycle(bucket_name):767    bucket_name = normalize_bucket_name(bucket_name)768    exists, code, body = is_bucket_available(bucket_name)769    if not exists:770        return xml_response(body, status_code=code)771    BackendState.lifecycle_config(bucket_name).clear()772def set_replication(bucket_name, replication):773    bucket_name = normalize_bucket_name(bucket_name)774    exists, code, body = is_bucket_available(bucket_name)775    if not exists:776        return xml_response(body, status_code=code)777    if isinstance(to_str(replication), str):778        replication = xmltodict.parse(replication)779    bucket_replication = BackendState.replication_config(bucket_name)780    bucket_replication.clear()781    bucket_replication.update(replication)782    return 200783# -------------784# UTIL METHODS785# -------------786def is_bucket_available(bucket_name):787    body = {"Code": "200"}788    exists, code = bucket_exists(bucket_name)789    if not exists:790        body = {791            "Error": {792                "Code": code,793                "Message": "The bucket does not exist",794                "BucketName": bucket_name,795            }796        }797        return exists, code, body798    return True, 200, body799def bucket_exists(bucket_name):800    """Tests for the existence of the specified bucket. Returns the error code801    if the bucket does not exist (200 if the bucket does exist).802    """803    bucket_name = normalize_bucket_name(bucket_name)804    s3_client = aws_stack.connect_to_service("s3")805    try:806        s3_client.head_bucket(Bucket=bucket_name)807    except ClientError as err:808        error_code = err.response.get("Error").get("Code")809        return False, error_code810    return True, 200811def strip_chunk_signatures(body, content_length):812    # borrowed from https://github.com/spulec/moto/pull/4201813    body_io = io.BytesIO(body)814    new_body = bytearray(content_length)815    pos = 0816    line = body_io.readline()817    while line:818        # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html#sigv4-chunked-body-definition819        # str(hex(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n820        chunk_size = int(line[: line.find(b";")].decode("utf8"), 16)821        new_body[pos : pos + chunk_size] = body_io.read(chunk_size)822        pos = pos + chunk_size823        body_io.read(2)  # skip trailing \r\n824        line = body_io.readline()825    return bytes(new_body)826def check_content_md5(data, headers):827    if headers.get("x-amz-content-sha256", None) == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD":828        content_length = headers.get("x-amz-decoded-content-length")829        if not content_length:830            return error_response(831                '"X-Amz-Decoded-Content-Length" header is missing',832                "SignatureDoesNotMatch",833                status_code=403,834            )835        try:836            content_length = int(content_length)837        except ValueError:838            return error_response(839                'Wrong "X-Amz-Decoded-Content-Length" header',840                "SignatureDoesNotMatch",841                status_code=403,842            )843        data = strip_chunk_signatures(data, content_length)844    actual = md5(data)845    try:846        md5_header = headers["Content-MD5"]847        if not is_base64(md5_header):848            raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)849        expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))850    except Exception:851        return error_response(852            "The Content-MD5 you specified is not valid.",853            "InvalidDigest",854            status_code=400,855        )856    if actual != expected:857        return error_response(858            "The Content-MD5 you specified did not match what we received.",859            "BadDigest",860            status_code=400,861        )862def error_response(message, code, status_code=400):863    result = {"Error": {"Code": code, "Message": message}}864    content = xmltodict.unparse(result)865    return xml_response(content, status_code=status_code)866def xml_response(content, status_code=200):867    headers = {"Content-Type": "application/xml"}868    return requests_response(content, status_code=status_code, headers=headers)869def no_such_key_error(resource, requestId=None, status_code=400):870    result = {871        "Error": {872            "Code": "NoSuchKey",873            "Message": "The resource you requested does not exist",874            "Resource": resource,875            "RequestId": requestId,876        }877    }878    content = xmltodict.unparse(result)879    return xml_response(content, status_code=status_code)880def no_such_bucket(bucket_name, requestId=None, status_code=404):881    # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists882    result = {883        "Error": {884            "Code": "NoSuchBucket",885            "Message": "The specified bucket does not exist",886            "BucketName": bucket_name,887            "RequestId": requestId,888            "HostId": short_uid(),889        }890    }891    content = xmltodict.unparse(result)892    return xml_response(content, status_code=status_code)893def token_expired_error(resource, requestId=None, status_code=400):894    result = {895        "Error": {896            "Code": "ExpiredToken",897            "Message": "The provided token has expired.",898            "Resource": resource,899            "RequestId": requestId,900        }901    }902    content = xmltodict.unparse(result)903    return xml_response(content, status_code=status_code)904def expand_redirect_url(starting_url, key, bucket):905    """Add key and bucket parameters to starting URL query string."""906    parsed = urlparse(starting_url)907    query = collections.OrderedDict(parse_qsl(parsed.query))908    query.update([("key", key), ("bucket", bucket)])909    redirect_url = urlunparse(910        (911            parsed.scheme,912            parsed.netloc,913            parsed.path,914            parsed.params,915            urlencode(query),916            None,917        )918    )919    return redirect_url920def is_bucket_specified_in_domain_name(path, headers):921    host = headers.get("host", "")922    return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)923def is_object_specific_request(path, headers):924    """Return whether the given request is specific to a certain S3 object.925    Note: the bucket name is usually specified as a path parameter,926    but may also be part of the domain name!"""927    bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)928    parts = len(path.split("/"))929    return parts > (1 if bucket_in_domain else 2)930def empty_response():931    response = Response()932    response.status_code = 200933    response._content = ""934    return response935def handle_notification_request(bucket, method, data):936    if method == "GET":937        return handle_get_bucket_notification(bucket)938    if method == "PUT":939        return handle_put_bucket_notification(bucket, data)940    return empty_response()941def handle_get_bucket_notification(bucket):942    response = Response()943    response.status_code = 200944    response._content = ""945    result = f'<NotificationConfiguration xmlns="{XMLNS_S3}">'946    notifications = BackendState.notification_configs(bucket) or []947    for notif in notifications:948        for dest in NOTIFICATION_DESTINATION_TYPES:949            if dest in notif:950                dest_dict = {951                    f"{dest}Configuration": {952                        "Id": notif["Id"],953                        dest: notif[dest],954                        "Event": notif["Event"],955                        "Filter": notif["Filter"],956                    }957                }958                result += xmltodict.unparse(dest_dict, full_document=False)959    result += "</NotificationConfiguration>"960    response._content = result961    return response962def _validate_filter_rules(filter_doc):963    rules = filter_doc.get("FilterRule")964    if not rules:965        return966    for rule in rules:967        name = rule.get("Name", "")968        if name.lower() not in ["suffix", "prefix"]:969            raise InvalidFilterRuleName(name)970        # TODO: check what other rules there are971def _sanitize_notification_filter_rules(filter_doc):972    rules = filter_doc.get("FilterRule")973    if not rules:974        return975    for rule in rules:976        name = rule.get("Name", "")977        if name.lower() not in ["suffix", "prefix"]:978            raise InvalidFilterRuleName(name)979        rule["Name"] = name.title()980def handle_put_bucket_notification(bucket, data):981    parsed = strip_xmlns(xmltodict.parse(data))982    notif_config = parsed.get("NotificationConfiguration")983    notifications = BackendState.notification_configs(bucket)984    notifications.clear()985    for dest in NOTIFICATION_DESTINATION_TYPES:986        config = notif_config.get("%sConfiguration" % dest)987        configs = config if isinstance(config, list) else [config] if config else []988        for config in configs:989            events = config.get("Event")990            if isinstance(events, str):991                events = [events]992            event_filter = config.get("Filter", {})993            # make sure FilterRule is an array994            s3_filter = _get_s3_filter(event_filter)995            if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):996                s3_filter["FilterRule"] = [s3_filter["FilterRule"]]997            # make sure FilterRules are valid and sanitize if necessary998            _sanitize_notification_filter_rules(s3_filter)999            # create final details dict1000            notification_details = {1001                "Id": config.get("Id", str(uuid.uuid4())),1002                "Event": events,1003                dest: config.get(dest),1004                "Filter": event_filter,1005            }1006            notifications.append(clone(notification_details))1007    return empty_response()1008def remove_bucket_notification(bucket):1009    notification_configs = BackendState.notification_configs(bucket)1010    if notification_configs:1011        notification_configs.clear()1012class ProxyListenerS3(ProxyListener):1013    def api_name(self):1014        return "s3"1015    @staticmethod1016    def is_s3_copy_request(headers, path):1017        return "x-amz-copy-source" in headers or "x-amz-copy-source" in path1018    @staticmethod1019    def is_create_multipart_request(query):1020        return query.startswith("uploads")1021    @staticmethod1022    def is_multipart_upload(query):1023        return query.startswith("uploadId")1024    @staticmethod1025    def get_201_response(key, bucket_name):1026        return """1027                <PostResponse>1028                    <Location>{protocol}://{host}/{encoded_key}</Location>1029                    <Bucket>{bucket}</Bucket>1030                    <Key>{key}</Key>1031                    <ETag>{etag}</ETag>1032                </PostResponse>1033                """.format(1034            protocol=get_service_protocol(),1035            host=config.HOSTNAME_EXTERNAL,1036            encoded_key=quote(key, safe=""),1037            key=key,1038            bucket=bucket_name,1039            etag="d41d8cd98f00b204e9800998ecf8427f",1040        )1041    @staticmethod1042    def _update_location(content, bucket_name):1043        bucket_name = normalize_bucket_name(bucket_name)1044        host = config.HOSTNAME_EXTERNAL1045        if ":" not in host:1046            host = f"{host}:{config.service_port('s3')}"1047        return re.sub(1048            r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",1049            r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),1050            content,1051            flags=re.MULTILINE,1052        )1053    @staticmethod1054    def is_query_allowable(method, query):1055        # Generally if there is a query (some/path/with?query) we don't want to send notifications1056        if not query:1057            return True1058        # Except we do want to notify on multipart and presigned url upload completion1059        contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query1060        contains_key = "AWSAccessKeyId" in query and "Signature" in query1061        # nodejs sdk putObjectCommand is adding x-id=putobject in the query1062        allowed_query = "x-id=" in query.lower()1063        if (1064            (method == "POST" and query.startswith("uploadId"))1065            or contains_cred1066            or contains_key1067            or allowed_query1068        ):1069            return True1070    @staticmethod1071    def parse_policy_expiration_date(expiration_string):1072        try:1073            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)1074        except Exception:1075            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)1076        # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object1077        dt = dt.replace(tzinfo=datetime.timezone.utc)1078        return dt1079    def forward_request(self, method, path, data, headers):1080        # Create list of query parameteres from the url1081        parsed = urlparse("{}{}".format(config.get_edge_url(), path))1082        query_params = parse_qs(parsed.query)1083        path_orig = path1084        path = path.replace(1085            "#", "%23"1086        )  # support key names containing hashes (e.g., required by Amplify)1087        # extracting bucket name from the request1088        parsed_path = urlparse(path)1089        bucket_name = extract_bucket_name(headers, parsed_path.path)1090        if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1091            if len(parsed_path.path) <= 1:1092                return error_response(1093                    "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1094                    + "configured to use path style addressing, or send a valid "1095                    + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1096                    "InvalidBucketName",1097                    status_code=400,1098                )1099            return error_response(1100                "The specified bucket is not valid.",1101                "InvalidBucketName",1102                status_code=400,1103            )1104        # Detecting pre-sign url and checking signature1105        if any(p in query_params for p in SIGNATURE_V2_PARAMS) or any(1106            p in query_params for p in SIGNATURE_V4_PARAMS1107        ):1108            response = authenticate_presign_url(1109                method=method, path=path, data=data, headers=headers1110            )1111            if response is not None:1112                return response1113        # handling s3 website hosting requests1114        if is_static_website(headers) and method == "GET":1115            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1116        # check content md5 hash integrity if not a copy request or multipart initialization1117        if (1118            "Content-MD5" in headers1119            and not self.is_s3_copy_request(headers, path)1120            and not self.is_create_multipart_request(parsed_path.query)1121        ):1122            response = check_content_md5(data, headers)1123            if response is not None:1124                return response1125        modified_data = None1126        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11127        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1128        to_find2 = to_bytes("<CreateBucketConfiguration")1129        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1130            # Note: with the latest version, <CreateBucketConfiguration> must either1131            # contain a valid <LocationConstraint>, or not be present at all in the body.1132            modified_data = b""1133        # POST requests to S3 may include a "${filename}" placeholder in the1134        # key, which should be replaced with an actual file name before storing.1135        if method == "POST":1136            original_data = not_none_or(modified_data, data)1137            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1138            if expanded_data is not original_data:1139                modified_data = expanded_data1140        # If no content-type is provided, 'binary/octet-stream' should be used1141        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1142        if method == "PUT" and not headers.get("content-type"):1143            headers["content-type"] = "binary/octet-stream"1144        # parse query params1145        query = parsed_path.query1146        path = parsed_path.path1147        query_map = parse_qs(query, keep_blank_values=True)1148        # remap metadata query params (not supported in moto) to request headers1149        append_metadata_headers(method, query_map, headers)1150        # apply fixes1151        headers_changed = fix_metadata_key_underscores(request_headers=headers)1152        if query == "notification" or "notification" in query_map:1153            # handle and return response for ?notification request1154            response = handle_notification_request(bucket_name, method, data)1155            return response1156        # if the Expires key in the url is already expired then return error1157        if method == "GET" and "Expires" in query_map:1158            ts = datetime.datetime.fromtimestamp(1159                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1160            )1161            if is_expired(ts):1162                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1163        # If multipart POST with policy in the params, return error if the policy has expired1164        if method == "POST":1165            policy_key, policy_value = multipart_content.find_multipart_key_value(1166                data, headers, "policy"1167            )1168            if policy_key and policy_value:1169                policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1170                expiration_string = policy.get("expiration", None)  # Example: 2020-06-05T13:37:12Z1171                if expiration_string:1172                    expiration_datetime = self.parse_policy_expiration_date(expiration_string)1173                    if is_expired(expiration_datetime):1174                        return token_expired_error(path, headers.get("x-amz-request-id"), 400)1175        if query == "cors" or "cors" in query_map:1176            if method == "GET":1177                return get_cors(bucket_name)1178            if method == "PUT":1179                return set_cors(bucket_name, data)1180            if method == "DELETE":1181                return delete_cors(bucket_name)1182        if query == "requestPayment" or "requestPayment" in query_map:1183            if method == "GET":1184                return get_request_payment(bucket_name)1185            if method == "PUT":1186                return set_request_payment(bucket_name, data)1187        if query == "lifecycle" or "lifecycle" in query_map:1188            if method == "GET":1189                return get_lifecycle(bucket_name)1190            if method == "PUT":1191                return set_lifecycle(bucket_name, data)1192            if method == "DELETE":1193                delete_lifecycle(bucket_name)1194        if query == "replication" or "replication" in query_map:1195            if method == "GET":1196                return get_replication(bucket_name)1197            if method == "PUT":1198                return set_replication(bucket_name, data)1199        if method == "DELETE" and validate_bucket_name(bucket_name):1200            delete_lifecycle(bucket_name)1201        path_orig_escaped = path_orig.replace("#", "%23")1202        if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1203            data_to_return = not_none_or(modified_data, data)1204            if modified_data is not None:1205                headers["Content-Length"] = str(len(data_to_return or ""))1206            return Request(1207                url=path_orig_escaped,1208                data=data_to_return,1209                headers=headers,1210                method=method,1211            )1212        return True1213    def return_response(self, method, path, data, headers, response):1214        path = to_str(path)1215        method = to_str(method)1216        path = path.replace("#", "%23")1217        # persist this API call to disk1218        super(ProxyListenerS3, self).return_response(method, path, data, headers, response)1219        bucket_name = extract_bucket_name(headers, path)1220        # POST requests to S3 may include a success_action_redirect or1221        # success_action_status field, which should be used to redirect a1222        # client to a new location.1223        key = None1224        if method == "POST":1225            key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1226            if key and redirect_url:1227                response.status_code = 3031228                response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1229                LOGGER.debug(1230                    "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1231                )1232            expanded_data = multipart_content.expand_multipart_filename(data, headers)1233            key, status_code = multipart_content.find_multipart_key_value(1234                expanded_data, headers, "success_action_status"1235            )1236            if response.status_code == 201 and key:1237                response._content = self.get_201_response(key, bucket_name)1238                response.headers["Content-Length"] = str(len(response._content or ""))1239                response.headers["Content-Type"] = "application/xml; charset=utf-8"1240                return response1241        if response.status_code == 416:1242            if method == "GET":1243                return error_response(1244                    "The requested range cannot be satisfied.", "InvalidRange", 4161245                )1246            elif method == "HEAD":1247                response.status_code = 2001248                return response1249        parsed = urlparse(path)1250        bucket_name_in_host = uses_host_addressing(headers)1251        is_object_request = all(1252            [1253                "/" in path[1:] or bucket_name_in_host or key,1254                # check if this is an actual put object request, because it could also be1255                # a put bucket request with a path like this: /bucket_name/1256                bucket_name_in_host1257                or key1258                or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1259            ]1260        )1261        should_send_object_notification = all(1262            [1263                method in ("PUT", "POST", "DELETE"),1264                is_object_request,1265                self.is_query_allowable(method, parsed.query),1266            ]1267        )1268        should_send_tagging_notification = all(1269            ["tagging" in parsed.query, method in ("PUT", "DELETE"), is_object_request]1270        )1271        # get subscribers and send bucket notifications1272        if should_send_object_notification or should_send_tagging_notification:1273            # if we already have a good key, use it, otherwise examine the path1274            if key:1275                object_path = "/" + key1276            elif bucket_name_in_host:1277                object_path = parsed.path1278            else:1279                parts = parsed.path[1:].split("/", 1)1280                object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1281            version_id = response.headers.get("x-amz-version-id", None)1282            if should_send_object_notification:1283                method_map = {1284                    "PUT": "ObjectCreated",1285                    "POST": "ObjectCreated",1286                    "DELETE": "ObjectRemoved",1287                }1288            if should_send_tagging_notification:1289                method_map = {1290                    "PUT": "ObjectTagging",1291                    "DELETE": "ObjectTagging",1292                }1293            send_notifications(method, bucket_name, object_path, version_id, headers, method_map)1294        # publish event for creation/deletion of buckets:1295        if method in ("PUT", "DELETE") and (1296            "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01297        ):1298            event_type = (1299                event_publisher.EVENT_S3_CREATE_BUCKET1300                if method == "PUT"1301                else event_publisher.EVENT_S3_DELETE_BUCKET1302            )1303            event_publisher.fire_event(1304                event_type, payload={"n": event_publisher.get_hash(bucket_name)}1305            )1306        # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1307        if method == "PUT":1308            if parsed.query == "policy":1309                response._content = ""1310                response.status_code = 2041311                return response1312            # when creating s3 bucket using aws s3api the return header contains 'Location' param1313            if key is None:1314                # if the bucket is created in 'us-east-1' the location header contains bucket as path1315                # else the the header contains bucket url1316                if aws_stack.get_region() == "us-east-1":1317                    response.headers["Location"] = "/{}".format(bucket_name)1318                else:1319                    # Note: we need to set the correct protocol here1320                    protocol = (1321                        headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1322                        or "http"1323                    )1324                    response.headers["Location"] = "{}://{}.{}:{}/".format(1325                        protocol,1326                        bucket_name,1327                        constants.S3_VIRTUAL_HOSTNAME,1328                        config.EDGE_PORT,1329                    )1330        if response is not None:1331            reset_content_length = False1332            # append CORS headers and other annotations/patches to response1333            append_cors_headers(1334                bucket_name,1335                request_method=method,1336                request_headers=headers,1337                response=response,1338            )1339            append_last_modified_headers(response=response)1340            fix_list_objects_response(method, path, data, response)1341            fix_range_content_type(bucket_name, path, headers, response)1342            fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1343            fix_metadata_key_underscores(response=response)1344            fix_creation_date(method, path, response=response)1345            ret304_on_etag(data, headers, response)1346            append_aws_request_troubleshooting_headers(response)1347            fix_delimiter(response)1348            fix_xml_preamble_newline(method, path, headers, response)1349            if method == "PUT":1350                key_name = extract_key_name(headers, path)1351                if key_name:1352                    set_object_expiry(bucket_name, key_name, headers)1353            # Remove body from PUT response on presigned URL1354            # https://github.com/localstack/localstack/issues/1317...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
