Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py  
...514            response._content = content.replace(515                "</ListBucketResult>", "%s</ListBucketResult>" % insert516            )517            response.headers.pop("Content-Length", None)518def append_metadata_headers(method, query_map, headers):519    for key, value in query_map.items():520        if key.lower().startswith(OBJECT_METADATA_KEY_PREFIX):521            if headers.get(key) is None:522                headers[key] = value[0]523def fix_location_constraint(response):524    """Make sure we return a valid non-empty LocationConstraint, as this otherwise breaks Serverless."""525    try:526        content = to_str(response.content or "") or ""527    except Exception:528        content = ""529    if "LocationConstraint" in content:530        pattern = r"<LocationConstraint([^>]*)>\s*</LocationConstraint>"531        replace = r"<LocationConstraint\1>%s</LocationConstraint>" % aws_stack.get_region()532        response._content = re.sub(pattern, replace, content)533        remove_xml_preamble(response)534def fix_range_content_type(bucket_name, path, headers, response):535    # Fix content type for Range requests - https://github.com/localstack/localstack/issues/1259536    if "Range" not in headers:537        return538    if response.status_code >= 400:539        return540    s3_client = aws_stack.connect_to_service("s3")541    path = urlparse.urlparse(urlparse.unquote(path)).path542    key_name = extract_key_name(headers, path)543    result = s3_client.head_object(Bucket=bucket_name, Key=key_name)544    content_type = result["ContentType"]545    if response.headers.get("Content-Type") == "text/html; charset=utf-8":546        response.headers["Content-Type"] = content_type547def fix_delete_objects_response(bucket_name, method, parsed_path, data, headers, response):548    # Deleting non-existing keys should not result in errors.549    # Fixes https://github.com/localstack/localstack/issues/1893550    if not (method == "POST" and parsed_path.query == "delete" and "<Delete" in to_str(data or "")):551        return552    content = to_str(response._content)553    if "<Error>" not in content:554        return555    result = xmltodict.parse(content).get("DeleteResult")556    errors = result.get("Error")557    errors = errors if isinstance(errors, list) else [errors]558    deleted = result.get("Deleted")559    if not isinstance(result.get("Deleted"), list):560        deleted = result["Deleted"] = [deleted] if deleted else []561    for entry in list(errors):562        if set(entry.keys()) == set(["Key"]):563            errors.remove(entry)564            deleted.append(entry)565    if not errors:566        result.pop("Error")567    response._content = xmltodict.unparse({"DeleteResult": result})568def fix_metadata_key_underscores(request_headers={}, response=None):569    # fix for https://github.com/localstack/localstack/issues/1790570    underscore_replacement = "---"571    meta_header_prefix = "x-amz-meta-"572    prefix_len = len(meta_header_prefix)573    updated = False574    for key in list(request_headers.keys()):575        if key.lower().startswith(meta_header_prefix):576            key_new = meta_header_prefix + key[prefix_len:].replace("_", underscore_replacement)577            if key != key_new:578                request_headers[key_new] = request_headers.pop(key)579                updated = True580    if response is not None:581        for key in list(response.headers.keys()):582            if key.lower().startswith(meta_header_prefix):583                key_new = meta_header_prefix + key[prefix_len:].replace(underscore_replacement, "_")584                if key != key_new:585                    response.headers[key_new] = response.headers.pop(key)586    return updated587def fix_creation_date(method, path, response):588    if method != "GET" or path != "/":589        return590    response._content = re.sub(591        r"(\.[0-9]+)(\+00:00)?</CreationDate>",592        r"\1Z</CreationDate>",593        to_str(response._content),594    )595def fix_delimiter(data, headers, response):596    if response.status_code == 200 and response._content:597        c, xml_prefix, delimiter = response._content, "<?xml", "<Delimiter><"598        pattern = "[<]Delimiter[>]None[<]"599        if isinstance(c, bytes):600            xml_prefix, delimiter = xml_prefix.encode(), delimiter.encode()601            pattern = pattern.encode()602        if c.startswith(xml_prefix):603            response._content = re.compile(pattern).sub(delimiter, c)604def convert_to_chunked_encoding(method, path, response):605    if method != "GET" or path != "/":606        return607    if response.headers.get("Transfer-Encoding", "").lower() == "chunked":608        return609    response.headers["Transfer-Encoding"] = "chunked"610    response.headers.pop("Content-Encoding", None)611    response.headers.pop("Content-Length", None)612def unquote(s):613    if (s[0], s[-1]) in (('"', '"'), ("'", "'")):614        return s[1:-1]615    return s616def ret304_on_etag(data, headers, response):617    etag = response.headers.get("ETag")618    if etag:619        match = headers.get("If-None-Match")620        if match and unquote(match) == unquote(etag):621            response.status_code = 304622            response._content = ""623def fix_etag_for_multipart(data, headers, response):624    # Fix for https://github.com/localstack/localstack/issues/1978625    if headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD:626        try:627            if b"chunk-signature=" not in to_bytes(data):628                return629            correct_hash = md5(strip_chunk_signatures(data))630            tags = r"<ETag>%s</ETag>"631            pattern = r"(")?([^<&]+)(")?"632            replacement = r"\g<1>%s\g<3>" % correct_hash633            response._content = re.sub(tags % pattern, tags % replacement, to_str(response.content))634            if response.headers.get("ETag"):635                response.headers["ETag"] = re.sub(pattern, replacement, response.headers["ETag"])636        except Exception:637            pass638def remove_xml_preamble(response):639    """Removes <?xml ... ?> from a response content"""640    response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))641# --------------642# HELPER METHODS643#   for lifecycle/replication/...644# --------------645def get_lifecycle(bucket_name):646    bucket_name = normalize_bucket_name(bucket_name)647    exists, code, body = is_bucket_available(bucket_name)648    if not exists:649        return xml_response(body, status_code=code)650    lifecycle = BUCKET_LIFECYCLE.get(bucket_name)651    status_code = 200652    if not lifecycle:653        lifecycle = {654            "Error": {655                "Code": "NoSuchLifecycleConfiguration",656                "Message": "The lifecycle configuration does not exist",657                "BucketName": bucket_name,658            }659        }660        status_code = 404661    body = xmltodict.unparse(lifecycle)662    return xml_response(body, status_code=status_code)663def get_replication(bucket_name):664    bucket_name = normalize_bucket_name(bucket_name)665    exists, code, body = is_bucket_available(bucket_name)666    if not exists:667        return xml_response(body, status_code=code)668    replication = BUCKET_REPLICATIONS.get(bucket_name)669    status_code = 200670    if not replication:671        replication = {672            "Error": {673                "Code": "ReplicationConfigurationNotFoundError",674                "Message": "The replication configuration was not found",675                "BucketName": bucket_name,676            }677        }678        status_code = 404679    body = xmltodict.unparse(replication)680    return xml_response(body, status_code=status_code)681def set_lifecycle(bucket_name, lifecycle):682    bucket_name = normalize_bucket_name(bucket_name)683    exists, code, body = is_bucket_available(bucket_name)684    if not exists:685        return xml_response(body, status_code=code)686    if isinstance(to_str(lifecycle), six.string_types):687        lifecycle = xmltodict.parse(lifecycle)688    BUCKET_LIFECYCLE[bucket_name] = lifecycle689    return 200690def delete_lifecycle(bucket_name):691    bucket_name = normalize_bucket_name(bucket_name)692    exists, code, body = is_bucket_available(bucket_name)693    if not exists:694        return xml_response(body, status_code=code)695    if BUCKET_LIFECYCLE.get(bucket_name):696        BUCKET_LIFECYCLE.pop(bucket_name)697def set_replication(bucket_name, replication):698    bucket_name = normalize_bucket_name(bucket_name)699    exists, code, body = is_bucket_available(bucket_name)700    if not exists:701        return xml_response(body, status_code=code)702    if isinstance(to_str(replication), six.string_types):703        replication = xmltodict.parse(replication)704    BUCKET_REPLICATIONS[bucket_name] = replication705    return 200706# -------------707# UTIL METHODS708# -------------709def strip_chunk_signatures(data):710    # For clients that use streaming v4 authentication, the request contains chunk signatures711    # in the HTTP body (see example below) which we need to strip as moto cannot handle them712    #713    # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921714    # <payload data ...>715    # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70716    data_new = ""717    if data is not None:718        data_new = re.sub(719            b"(^|\r\n)[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}(\r\n)(\r\n$)?",720            b"",721            to_bytes(data),722            flags=re.MULTILINE | re.DOTALL,723        )724    return data_new725def is_bucket_available(bucket_name):726    body = {"Code": "200"}727    exists, code = bucket_exists(bucket_name)728    if not exists:729        body = {730            "Error": {731                "Code": code,732                "Message": "The bucket does not exist",733                "BucketName": bucket_name,734            }735        }736        return exists, code, body737    return True, 200, body738def bucket_exists(bucket_name):739    """Tests for the existence of the specified bucket. Returns the error code740    if the bucket does not exist (200 if the bucket does exist).741    """742    bucket_name = normalize_bucket_name(bucket_name)743    s3_client = aws_stack.connect_to_service("s3")744    try:745        s3_client.head_bucket(Bucket=bucket_name)746    except ClientError as err:747        error_code = err.response.get("Error").get("Code")748        return False, error_code749    return True, 200750def check_content_md5(data, headers):751    actual = md5(strip_chunk_signatures(data))752    try:753        md5_header = headers["Content-MD5"]754        if not is_base64(md5_header):755            raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)756        expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))757    except Exception:758        return error_response(759            "The Content-MD5 you specified is not valid.",760            "InvalidDigest",761            status_code=400,762        )763    if actual != expected:764        return error_response(765            "The Content-MD5 you specified did not match what we received.",766            "BadDigest",767            status_code=400,768        )769def error_response(message, code, status_code=400):770    result = {"Error": {"Code": code, "Message": message}}771    content = xmltodict.unparse(result)772    return xml_response(content, status_code=status_code)773def xml_response(content, status_code=200):774    headers = {"Content-Type": "application/xml"}775    return requests_response(content, status_code=status_code, headers=headers)776def no_such_key_error(resource, requestId=None, status_code=400):777    result = {778        "Error": {779            "Code": "NoSuchKey",780            "Message": "The resource you requested does not exist",781            "Resource": resource,782            "RequestId": requestId,783        }784    }785    content = xmltodict.unparse(result)786    return xml_response(content, status_code=status_code)787def no_such_bucket(bucket_name, requestId=None, status_code=404):788    # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists789    result = {790        "Error": {791            "Code": "NoSuchBucket",792            "Message": "The specified bucket does not exist",793            "BucketName": bucket_name,794            "RequestId": requestId,795            "HostId": short_uid(),796        }797    }798    content = xmltodict.unparse(result)799    return xml_response(content, status_code=status_code)800def token_expired_error(resource, requestId=None, status_code=400):801    result = {802        "Error": {803            "Code": "ExpiredToken",804            "Message": "The provided token has expired.",805            "Resource": resource,806            "RequestId": requestId,807        }808    }809    content = xmltodict.unparse(result)810    return xml_response(content, status_code=status_code)811def expand_redirect_url(starting_url, key, bucket):812    """Add key and bucket parameters to starting URL query string."""813    parsed = urlparse.urlparse(starting_url)814    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))815    query.update([("key", key), ("bucket", bucket)])816    redirect_url = urlparse.urlunparse(817        (818            parsed.scheme,819            parsed.netloc,820            parsed.path,821            parsed.params,822            urlparse.urlencode(query),823            None,824        )825    )826    return redirect_url827def is_bucket_specified_in_domain_name(path, headers):828    host = headers.get("host", "")829    return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)830def is_object_specific_request(path, headers):831    """Return whether the given request is specific to a certain S3 object.832    Note: the bucket name is usually specified as a path parameter,833    but may also be part of the domain name!"""834    bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)835    parts = len(path.split("/"))836    return parts > (1 if bucket_in_domain else 2)837def empty_response():838    response = Response()839    response.status_code = 200840    response._content = ""841    return response842def handle_notification_request(bucket, method, data):843    if method == "GET":844        return handle_get_bucket_notification(bucket)845    if method == "PUT":846        return handle_put_bucket_notification(bucket, data)847    return empty_response()848def handle_get_bucket_notification(bucket):849    response = Response()850    response.status_code = 200851    response._content = ""852    # TODO check if bucket exists853    result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3854    if bucket in S3_NOTIFICATIONS:855        notifs = S3_NOTIFICATIONS[bucket]856        for notif in notifs:857            for dest in NOTIFICATION_DESTINATION_TYPES:858                if dest in notif:859                    dest_dict = {860                        "%sConfiguration"861                        % dest: {862                            "Id": notif["Id"],863                            dest: notif[dest],864                            "Event": notif["Event"],865                            "Filter": notif["Filter"],866                        }867                    }868                    result += xmltodict.unparse(dest_dict, full_document=False)869    result += "</NotificationConfiguration>"870    response._content = result871    return response872def _validate_filter_rules(filter_doc):873    rules = filter_doc.get("FilterRule")874    if not rules:875        return876    for rule in rules:877        name = rule.get("Name", "")878        if name.lower() not in ["suffix", "prefix"]:879            raise InvalidFilterRuleName(name)880        # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882    rules = filter_doc.get("FilterRule")883    if not rules:884        return885    for rule in rules:886        name = rule.get("Name", "")887        if name.lower() not in ["suffix", "prefix"]:888            raise InvalidFilterRuleName(name)889        rule["Name"] = name.title()890def handle_put_bucket_notification(bucket, data):891    parsed = xmltodict.parse(data)892    notif_config = parsed.get("NotificationConfiguration")893    notifications = []894    for dest in NOTIFICATION_DESTINATION_TYPES:895        config = notif_config.get("%sConfiguration" % dest)896        configs = config if isinstance(config, list) else [config] if config else []897        for config in configs:898            events = config.get("Event")899            if isinstance(events, six.string_types):900                events = [events]901            event_filter = config.get("Filter", {})902            # make sure FilterRule is an array903            s3_filter = _get_s3_filter(event_filter)904            if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):905                s3_filter["FilterRule"] = [s3_filter["FilterRule"]]906            # make sure FilterRules are valid and sanitize if necessary907            _sanitize_notification_filter_rules(s3_filter)908            # create final details dict909            notification_details = {910                "Id": config.get("Id", str(uuid.uuid4())),911                "Event": events,912                dest: config.get(dest),913                "Filter": event_filter,914            }915            notifications.append(clone(notification_details))916    S3_NOTIFICATIONS[bucket] = notifications917    return empty_response()918def remove_bucket_notification(bucket):919    if bucket in S3_NOTIFICATIONS:920        del S3_NOTIFICATIONS[bucket]921class ProxyListenerS3(PersistingProxyListener):922    def api_name(self):923        return "s3"924    @staticmethod925    def is_s3_copy_request(headers, path):926        return "x-amz-copy-source" in headers or "x-amz-copy-source" in path927    @staticmethod928    def is_create_multipart_request(query):929        return query.startswith("uploads")930    @staticmethod931    def is_multipart_upload(query):932        return query.startswith("uploadId")933    @staticmethod934    def get_201_response(key, bucket_name):935        return """936                <PostResponse>937                    <Location>{protocol}://{host}/{encoded_key}</Location>938                    <Bucket>{bucket}</Bucket>939                    <Key>{key}</Key>940                    <ETag>{etag}</ETag>941                </PostResponse>942                """.format(943            protocol=get_service_protocol(),944            host=config.HOSTNAME_EXTERNAL,945            encoded_key=urlparse.quote(key, safe=""),946            key=key,947            bucket=bucket_name,948            etag="d41d8cd98f00b204e9800998ecf8427f",949        )950    @staticmethod951    def _update_location(content, bucket_name):952        bucket_name = normalize_bucket_name(bucket_name)953        host = config.HOSTNAME_EXTERNAL954        if ":" not in host:955            host = "%s:%s" % (host, config.PORT_S3)956        return re.sub(957            r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958            r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959            content,960            flags=re.MULTILINE,961        )962    @staticmethod963    def is_query_allowable(method, query):964        # Generally if there is a query (some/path/with?query) we don't want to send notifications965        if not query:966            return True967        # Except we do want to notify on multipart and presigned url upload completion968        contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969        contains_key = "AWSAccessKeyId" in query and "Signature" in query970        # nodejs sdk putObjectCommand is adding x-id=putobject in the query971        allowed_query = "x-id=" in query.lower()972        if (973            (method == "POST" and query.startswith("uploadId"))974            or contains_cred975            or contains_key976            or allowed_query977        ):978            return True979    @staticmethod980    def parse_policy_expiration_date(expiration_string):981        try:982            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983        except Exception:984            dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985        # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986        dt = dt.replace(tzinfo=datetime.timezone.utc)987        return dt988    def forward_request(self, method, path, data, headers):989        # Create list of query parameteres from the url990        parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991        query_params = parse_qs(parsed.query)992        path_orig = path993        path = path.replace(994            "#", "%23"995        )  # support key names containing hashes (e.g., required by Amplify)996        # extracting bucket name from the request997        parsed_path = urlparse.urlparse(path)998        bucket_name = extract_bucket_name(headers, parsed_path.path)999        if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000            if len(parsed_path.path) <= 1:1001                return error_response(1002                    "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003                    + "configured to use path style addressing, or send a valid "1004                    + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005                    "InvalidBucketName",1006                    status_code=400,1007                )1008            return error_response(1009                "The specified bucket is not valid.",1010                "InvalidBucketName",1011                status_code=400,1012            )1013        # Detecting pre-sign url and checking signature1014        if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015            [p in query_params for p in SIGNATURE_V4_PARAMS]1016        ):1017            response = authenticate_presign_url(1018                method=method, path=path, data=data, headers=headers1019            )1020            if response is not None:1021                return response1022        # handling s3 website hosting requests1023        if is_static_website(headers) and method == "GET":1024            return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025        # check content md5 hash integrity if not a copy request or multipart initialization1026        if (1027            "Content-MD5" in headers1028            and not self.is_s3_copy_request(headers, path)1029            and not self.is_create_multipart_request(parsed_path.query)1030        ):1031            response = check_content_md5(data, headers)1032            if response is not None:1033                return response1034        modified_data = None1035        # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036        to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037        to_find2 = to_bytes("<CreateBucketConfiguration")1038        if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039            # Note: with the latest version, <CreateBucketConfiguration> must either1040            # contain a valid <LocationConstraint>, or not be present at all in the body.1041            modified_data = b""1042        # If this request contains streaming v4 authentication signatures, strip them from the message1043        # Related isse: https://github.com/localstack/localstack/issues/981044        # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045        is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046        if is_streaming_payload:1047            modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048            headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049            headers.pop(CONTENT_SHA256_HEADER)1050        # POST requests to S3 may include a "${filename}" placeholder in the1051        # key, which should be replaced with an actual file name before storing.1052        if method == "POST":1053            original_data = not_none_or(modified_data, data)1054            expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055            if expanded_data is not original_data:1056                modified_data = expanded_data1057        # If no content-type is provided, 'binary/octet-stream' should be used1058        # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059        if method == "PUT" and not headers.get("content-type"):1060            headers["content-type"] = "binary/octet-stream"1061        # parse query params1062        query = parsed_path.query1063        path = parsed_path.path1064        query_map = urlparse.parse_qs(query, keep_blank_values=True)1065        # remap metadata query params (not supported in moto) to request headers1066        append_metadata_headers(method, query_map, headers)1067        # apply fixes1068        headers_changed = fix_metadata_key_underscores(request_headers=headers)1069        if query == "notification" or "notification" in query_map:1070            # handle and return response for ?notification request1071            response = handle_notification_request(bucket_name, method, data)1072            return response1073        # if the Expires key in the url is already expired then return error1074        if method == "GET" and "Expires" in query_map:1075            ts = datetime.datetime.fromtimestamp(1076                int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077            )1078            if is_expired(ts):1079                return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080        # If multipart POST with policy in the params, return error if the policy has expired...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
