Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...930    @staticmethod931    def is_multipart_upload(query):932        return query.startswith("uploadId")933    @staticmethod934    def get_201_response(key, bucket_name):935        return """936                <PostResponse>937                    <Location>{protocol}://{host}/{encoded_key}</Location>938                    <Bucket>{bucket}</Bucket>939                    <Key>{key}</Key>940                    <ETag>{etag}</ETag>941                </PostResponse>942                """.format(943            protocol=get_service_protocol(),944            host=config.HOSTNAME_EXTERNAL,945            encoded_key=urlparse.quote(key, safe=""),946            key=key,947            bucket=bucket_name,948            etag="d41d8cd98f00b204e9800998ecf8427f",949        )950    @staticmethod951    def _update_location(content, bucket_name):952        bucket_name = normalize_bucket_name(bucket_name)953        host = config.HOSTNAME_EXTERNAL954        if ":" not in host:955            host = "%s:%s" % (host, config.PORT_S3)956        return re.sub(957            r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958            r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959            content,960            flags=re.MULTILINE,961        )962    @staticmethod963    def is_query_allowable(method, query):964        # Generally if there is a query (some/path/with?query) we don't want to send notifications965        if not query:966            return True967        # Except we do want to notify on multipart and presigned url upload completion968        contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969        contains_key = "AWSAccessKeyId" in query and "Signature" in query970        # nodejs sdk putObjectCommand is adding x-id=putobject in the query971        allowed_query = "x-id=" in query.lower()972        if (973            (method == "POST" and query.startswith("uploadId"))974            or contains_cred975            or contains_key976            or allowed_query977        ):978            return True979    @staticmethod980    def parse_policy_expiration_date(expiration_string):981        try:982            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983        except Exception:984            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985        # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986        dt = dt.replace(tzinfo=datetime.timezone.utc)987        return dt988    def forward_request(self, method, path, data, headers):989        # Create list of query parameteres from the url990        parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991        query_params = parse_qs(parsed.query)992        path_orig = path993        path = path.replace(994            "#", "%23"995        )  # support key names containing hashes (e.g., required by Amplify)996        # extracting bucket name from the request997        parsed_path = urlparse.urlparse(path)998        bucket_name = extract_bucket_name(headers, parsed_path.path)999        if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000            if len(parsed_path.path) <= 1:1001                return error_response(1002                    "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003                    + "configured to use path style addressing, or send a valid "1004                    + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005                    "InvalidBucketName",1006                    status_code=400,1007                )1008            return error_response(1009                "The specified bucket is not valid.",1010                "InvalidBucketName",1011                status_code=400,1012            )1013        # Detecting pre-sign url and checking signature1014        if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015            [p in query_params for p in SIGNATURE_V4_PARAMS]1016        ):1017            response = authenticate_presign_url(1018                method=method, path=path, data=data, headers=headers1019            )1020            if response is not None:1021                return response1022        # handling s3 website hosting requests1023        if is_static_website(headers) and method == "GET":1024            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025        # check content md5 hash integrity if not a copy request or multipart initialization1026        if (1027            "Content-MD5" in headers1028            and not self.is_s3_copy_request(headers, path)1029            and not self.is_create_multipart_request(parsed_path.query)1030        ):1031            response = check_content_md5(data, headers)1032            if response is not None:1033                return response1034        modified_data = None1035        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037        to_find2 = to_bytes("<CreateBucketConfiguration")1038        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039            # Note: with the latest version, <CreateBucketConfiguration> must either1040            # contain a valid <LocationConstraint>, or not be present at all in the body.1041            modified_data = b""1042        # If this request contains streaming v4 authentication signatures, strip them from the message1043        # Related isse: https://github.com/localstack/localstack/issues/981044        # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045        is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046        if is_streaming_payload:1047            modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048            headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049            headers.pop(CONTENT_SHA256_HEADER)1050        # POST requests to S3 may include a "${filename}" placeholder in the1051        # key, which should be replaced with an actual file name before storing.1052        if method == "POST":1053            original_data = not_none_or(modified_data, data)1054            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055            if expanded_data is not original_data:1056                modified_data = expanded_data1057        # If no content-type is provided, 'binary/octet-stream' should be used1058        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059        if method == "PUT" and not headers.get("content-type"):1060            headers["content-type"] = "binary/octet-stream"1061        # parse query params1062        query = parsed_path.query1063        path = parsed_path.path1064        query_map = urlparse.parse_qs(query, keep_blank_values=True)1065        # remap metadata query params (not supported in moto) to request headers1066        append_metadata_headers(method, query_map, headers)1067        # apply fixes1068        headers_changed = fix_metadata_key_underscores(request_headers=headers)1069        if query == "notification" or "notification" in query_map:1070            # handle and return response for ?notification request1071            response = handle_notification_request(bucket_name, method, data)1072            return response1073        # if the Expires key in the url is already expired then return error1074        if method == "GET" and "Expires" in query_map:1075            ts = datetime.datetime.fromtimestamp(1076                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077            )1078            if is_expired(ts):1079                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080        # If multipart POST with policy in the params, return error if the policy has expired1081        if method == "POST":1082            policy_key, policy_value = multipart_content.find_multipart_key_value(1083                data, headers, "policy"1084            )1085            if policy_key and policy_value:1086                policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087                expiration_string = policy.get("expiration", None)  # Example: 2020-06-05T13:37:12Z1088                if expiration_string:1089                    expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090                    if is_expired(expiration_datetime):1091                        return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092        if query == "cors" or "cors" in query_map:1093            if method == "GET":1094                return get_cors(bucket_name)1095            if method == "PUT":1096                return set_cors(bucket_name, data)1097            if method == "DELETE":1098                return delete_cors(bucket_name)1099        if query == "requestPayment" or "requestPayment" in query_map:1100            if method == "GET":1101                return get_request_payment(bucket_name)1102            if method == "PUT":1103                return set_request_payment(bucket_name, data)1104        if query == "lifecycle" or "lifecycle" in query_map:1105            if method == "GET":1106                return get_lifecycle(bucket_name)1107            if method == "PUT":1108                return set_lifecycle(bucket_name, data)1109            if method == "DELETE":1110                delete_lifecycle(bucket_name)1111        if query == "replication" or "replication" in query_map:1112            if method == "GET":1113                return get_replication(bucket_name)1114            if method == "PUT":1115                return set_replication(bucket_name, data)1116        if method == "DELETE" and validate_bucket_name(bucket_name):1117            delete_lifecycle(bucket_name)1118        path_orig_escaped = path_orig.replace("#", "%23")1119        if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120            data_to_return = not_none_or(modified_data, data)1121            if modified_data is not None:1122                headers["Content-Length"] = str(len(data_to_return or ""))1123            return Request(1124                url=path_orig_escaped,1125                data=data_to_return,1126                headers=headers,1127                method=method,1128            )1129        return True1130    def return_response(self, method, path, data, headers, response, request_handler=None):1131        path = to_str(path)1132        method = to_str(method)1133        path = path.replace("#", "%23")1134        # persist this API call to disk1135        super(ProxyListenerS3, self).return_response(1136            method, path, data, headers, response, request_handler1137        )1138        bucket_name = extract_bucket_name(headers, path)1139        # POST requests to S3 may include a success_action_redirect or1140        # success_action_status field, which should be used to redirect a1141        # client to a new location.1142        key = None1143        if method == "POST":1144            key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145            if key and redirect_url:1146                response.status_code = 3031147                response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148                LOGGER.debug(1149                    "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150                )1151            expanded_data = multipart_content.expand_multipart_filename(data, headers)1152            key, status_code = multipart_content.find_multipart_key_value(1153                expanded_data, headers, "success_action_status"1154            )1155            if response.status_code == 201 and key:1156                response._content = self.get_201_response(key, bucket_name)1157                response.headers["Content-Length"] = str(len(response._content or ""))1158                response.headers["Content-Type"] = "application/xml; charset=utf-8"1159                return response1160        if response.status_code == 416:1161            if method == "GET":1162                return error_response(1163                    "The requested range cannot be satisfied.", "InvalidRange", 4161164                )1165            elif method == "HEAD":1166                response.status_code = 2001167                return response1168        parsed = urlparse.urlparse(path)1169        bucket_name_in_host = uses_host_addressing(headers)1170        should_send_notifications = all(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
