Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...1020            if response is not None:1021                return response1022        # handling s3 website hosting requests1023        if is_static_website(headers) and method == "GET":1024            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025        # check content md5 hash integrity if not a copy request or multipart initialization1026        if (1027            "Content-MD5" in headers1028            and not self.is_s3_copy_request(headers, path)1029            and not self.is_create_multipart_request(parsed_path.query)1030        ):1031            response = check_content_md5(data, headers)1032            if response is not None:1033                return response1034        modified_data = None1035        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037        to_find2 = to_bytes("<CreateBucketConfiguration")1038        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039            # Note: with the latest version, <CreateBucketConfiguration> must either1040            # contain a valid <LocationConstraint>, or not be present at all in the body.1041            modified_data = b""1042        # If this request contains streaming v4 authentication signatures, strip them from the message1043        # Related isse: https://github.com/localstack/localstack/issues/981044        # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045        is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046        if is_streaming_payload:1047            modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048            headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049            headers.pop(CONTENT_SHA256_HEADER)1050        # POST requests to S3 may include a "${filename}" placeholder in the1051        # key, which should be replaced with an actual file name before storing.1052        if method == "POST":1053            original_data = not_none_or(modified_data, data)1054            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055            if expanded_data is not original_data:1056                modified_data = expanded_data1057        # If no content-type is provided, 'binary/octet-stream' should be used1058        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059        if method == "PUT" and not headers.get("content-type"):1060            headers["content-type"] = "binary/octet-stream"1061        # parse query params1062        query = parsed_path.query1063        path = parsed_path.path1064        query_map = urlparse.parse_qs(query, keep_blank_values=True)1065        # remap metadata query params (not supported in moto) to request headers1066        append_metadata_headers(method, query_map, headers)1067        # apply fixes1068        headers_changed = fix_metadata_key_underscores(request_headers=headers)1069        if query == "notification" or "notification" in query_map:1070            # handle and return response for ?notification request1071            response = handle_notification_request(bucket_name, method, data)1072            return response1073        # if the Expires key in the url is already expired then return error1074        if method == "GET" and "Expires" in query_map:1075            ts = datetime.datetime.fromtimestamp(1076                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077            )1078            if is_expired(ts):1079                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080        # If multipart POST with policy in the params, return error if the policy has expired1081        if method == "POST":1082            policy_key, policy_value = multipart_content.find_multipart_key_value(1083                data, headers, "policy"1084            )1085            if policy_key and policy_value:1086                policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087                expiration_string = policy.get("expiration", None)  # Example: 2020-06-05T13:37:12Z1088                if expiration_string:1089                    expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090                    if is_expired(expiration_datetime):1091                        return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092        if query == "cors" or "cors" in query_map:1093            if method == "GET":1094                return get_cors(bucket_name)1095            if method == "PUT":1096                return set_cors(bucket_name, data)1097            if method == "DELETE":1098                return delete_cors(bucket_name)1099        if query == "requestPayment" or "requestPayment" in query_map:1100            if method == "GET":1101                return get_request_payment(bucket_name)1102            if method == "PUT":1103                return set_request_payment(bucket_name, data)1104        if query == "lifecycle" or "lifecycle" in query_map:1105            if method == "GET":1106                return get_lifecycle(bucket_name)1107            if method == "PUT":1108                return set_lifecycle(bucket_name, data)1109            if method == "DELETE":1110                delete_lifecycle(bucket_name)1111        if query == "replication" or "replication" in query_map:1112            if method == "GET":1113                return get_replication(bucket_name)1114            if method == "PUT":1115                return set_replication(bucket_name, data)1116        if method == "DELETE" and validate_bucket_name(bucket_name):1117            delete_lifecycle(bucket_name)1118        path_orig_escaped = path_orig.replace("#", "%23")1119        if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120            data_to_return = not_none_or(modified_data, data)1121            if modified_data is not None:1122                headers["Content-Length"] = str(len(data_to_return or ""))1123            return Request(1124                url=path_orig_escaped,1125                data=data_to_return,1126                headers=headers,1127                method=method,1128            )1129        return True1130    def return_response(self, method, path, data, headers, response, request_handler=None):1131        path = to_str(path)1132        method = to_str(method)1133        path = path.replace("#", "%23")1134        # persist this API call to disk1135        super(ProxyListenerS3, self).return_response(1136            method, path, data, headers, response, request_handler1137        )1138        bucket_name = extract_bucket_name(headers, path)1139        # POST requests to S3 may include a success_action_redirect or1140        # success_action_status field, which should be used to redirect a1141        # client to a new location.1142        key = None1143        if method == "POST":1144            key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145            if key and redirect_url:1146                response.status_code = 3031147                response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148                LOGGER.debug(1149                    "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150                )1151            expanded_data = multipart_content.expand_multipart_filename(data, headers)1152            key, status_code = multipart_content.find_multipart_key_value(1153                expanded_data, headers, "success_action_status"1154            )1155            if response.status_code == 201 and key:1156                response._content = self.get_201_response(key, bucket_name)1157                response.headers["Content-Length"] = str(len(response._content or ""))1158                response.headers["Content-Type"] = "application/xml; charset=utf-8"1159                return response1160        if response.status_code == 416:1161            if method == "GET":1162                return error_response(1163                    "The requested range cannot be satisfied.", "InvalidRange", 4161164                )1165            elif method == "HEAD":1166                response.status_code = 2001167                return response1168        parsed = urlparse.urlparse(path)1169        bucket_name_in_host = uses_host_addressing(headers)1170        should_send_notifications = all(1171            [1172                method in ("PUT", "POST", "DELETE"),1173                "/" in path[1:] or bucket_name_in_host or key,1174                # check if this is an actual put object request, because it could also be1175                # a put bucket request with a path like this: /bucket_name/1176                bucket_name_in_host1177                or key1178                or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1179                self.is_query_allowable(method, parsed.query),1180            ]1181        )1182        # get subscribers and send bucket notifications1183        if should_send_notifications:1184            # if we already have a good key, use it, otherwise examine the path1185            if key:1186                object_path = "/" + key1187            elif bucket_name_in_host:1188                object_path = parsed.path1189            else:1190                parts = parsed.path[1:].split("/", 1)1191                object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1192            version_id = response.headers.get("x-amz-version-id", None)1193            send_notifications(method, bucket_name, object_path, version_id, headers)1194        # publish event for creation/deletion of buckets:1195        if method in ("PUT", "DELETE") and (1196            "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01197        ):1198            event_type = (1199                event_publisher.EVENT_S3_CREATE_BUCKET1200                if method == "PUT"1201                else event_publisher.EVENT_S3_DELETE_BUCKET1202            )1203            event_publisher.fire_event(1204                event_type, payload={"n": event_publisher.get_hash(bucket_name)}1205            )1206        # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1207        if method == "PUT":1208            if parsed.query == "policy":1209                response._content = ""1210                response.status_code = 2041211                return response1212            # when creating s3 bucket using aws s3api the return header contains 'Location' param1213            if key is None:1214                # if the bucket is created in 'us-east-1' the location header contains bucket as path1215                # else the the header contains bucket url1216                if aws_stack.get_region() == "us-east-1":1217                    response.headers["Location"] = "/{}".format(bucket_name)1218                else:1219                    # Note: we need to set the correct protocol here1220                    protocol = (1221                        headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1222                        or "http"1223                    )1224                    response.headers["Location"] = "{}://{}.{}:{}/".format(1225                        protocol,1226                        bucket_name,1227                        constants.S3_VIRTUAL_HOSTNAME,1228                        config.EDGE_PORT,1229                    )1230        if response is not None:1231            reset_content_length = False1232            # append CORS headers and other annotations/patches to response1233            append_cors_headers(1234                bucket_name,1235                request_method=method,1236                request_headers=headers,1237                response=response,1238            )1239            append_last_modified_headers(response=response)1240            append_list_objects_marker(method, path, data, response)1241            fix_location_constraint(response)1242            fix_range_content_type(bucket_name, path, headers, response)1243            fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1244            fix_metadata_key_underscores(response=response)1245            fix_creation_date(method, path, response=response)1246            fix_etag_for_multipart(data, headers, response)1247            ret304_on_etag(data, headers, response)1248            append_aws_request_troubleshooting_headers(response)1249            fix_delimiter(data, headers, response)1250            if method == "PUT":1251                set_object_expiry(path, headers)1252            # Remove body from PUT response on presigned URL1253            # https://github.com/localstack/localstack/issues/13171254            if (1255                method == "PUT"1256                and int(response.status_code) < 4001257                and (1258                    "X-Amz-Security-Token=" in path1259                    or "X-Amz-Credential=" in path1260                    or "AWSAccessKeyId=" in path1261                )1262            ):1263                response._content = ""1264                reset_content_length = True1265            response_content_str = None1266            try:1267                response_content_str = to_str(response._content)1268            except Exception:1269                pass1270            # Honor response header overrides1271            # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1272            if method == "GET":1273                add_accept_range_header(response)1274                add_response_metadata_headers(response)1275                if is_object_expired(path):1276                    return no_such_key_error(path, headers.get("x-amz-request-id"), 400)1277                # AWS C# SDK uses get bucket acl to check the existence of the bucket1278                # If not exists, raises a NoSuchBucket Error1279                if bucket_name and "/?acl" in path:1280                    exists, code, body = is_bucket_available(bucket_name)1281                    if not exists:1282                        return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1283                query_map = urlparse.parse_qs(parsed.query, keep_blank_values=True)1284                for param_name, header_name in ALLOWED_HEADER_OVERRIDES.items():1285                    if param_name in query_map:1286                        response.headers[header_name] = query_map[param_name][0]1287            if response_content_str and response_content_str.startswith("<"):1288                is_bytes = isinstance(response._content, six.binary_type)1289                response._content = response_content_str1290                append_last_modified_headers(response=response, content=response_content_str)1291                # We need to un-pretty-print the XML, otherwise we run into this issue with Spark:1292                # https://github.com/jserver/mock-s3/pull/9/files1293                # https://github.com/localstack/localstack/issues/1831294                # Note: yet, we need to make sure we have a newline after the first line: <?xml ...>\n1295                # Note: make sure to return XML docs verbatim: https://github.com/localstack/localstack/issues/10371296                if method != "GET" or not is_object_specific_request(path, headers):1297                    response._content = re.sub(1298                        r"([^\?])>\n\s*<",1299                        r"\1><",1300                        response_content_str,1301                        flags=re.MULTILINE,1302                    )1303                # update Location information in response payload1304                response._content = self._update_location(response._content, bucket_name)1305                # convert back to bytes1306                if is_bytes:1307                    response._content = to_bytes(response._content)1308                # fix content-type: https://github.com/localstack/localstack/issues/6181309                #                   https://github.com/localstack/localstack/issues/5491310                #                   https://github.com/localstack/localstack/issues/8541311                if "text/html" in response.headers.get(1312                    "Content-Type", ""1313                ) and not response_content_str.lower().startswith("<!doctype html"):1314                    response.headers["Content-Type"] = "application/xml; charset=utf-8"1315                reset_content_length = True1316            # update Content-Length headers (fix https://github.com/localstack/localstack/issues/541)1317            if method == "DELETE":1318                reset_content_length = True1319            if reset_content_length:1320                response.headers["Content-Length"] = str(len(response._content or ""))1321            # convert to chunked encoding, for compatibility with certain SDKs (e.g., AWS PHP SDK)1322            convert_to_chunked_encoding(method, path, response)1323def serve_static_website(headers, path, bucket_name):1324    s3_client = aws_stack.connect_to_service("s3")1325    # check if bucket exists1326    try:1327        s3_client.head_bucket(Bucket=bucket_name)1328    except ClientError:1329        return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1330    def respond_with_key(status_code, key):1331        obj = s3_client.get_object(Bucket=bucket_name, Key=key)1332        response_headers = {}1333        if "if-none-match" in headers and "ETag" in obj and obj["ETag"] in headers["if-none-match"]:1334            return requests_response(status_code=304, content="", headers=response_headers)1335        if "WebsiteRedirectLocation" in obj:1336            response_headers["location"] = obj["WebsiteRedirectLocation"]1337            return requests_response(status_code=301, content="", headers=response_headers)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
