How to use s3_create_bucket_with_client method in localstack

Best Python code snippet using localstack_python

test_s3.py

Source:test_s3.py Github

copy

Full Screen

...62 ):63 return _client("s3", region_name=region_name)64 return _s3_client65@pytest.fixture66def s3_create_bucket_with_client(s3_resource):67 buckets = []68 def factory(s3_client, **kwargs) -> str:69 if "Bucket" not in kwargs:70 kwargs["Bucket"] = f"test-bucket-{short_uid()}"71 response = s3_client.create_bucket(**kwargs)72 buckets.append(kwargs["Bucket"])73 return response74 yield factory75 # cleanup76 for bucket in buckets:77 try:78 bucket = s3_resource.Bucket(bucket)79 bucket.objects.all().delete()80 bucket.object_versions.all().delete()81 bucket.delete()82 except Exception as e:83 LOG.debug(f"error cleaning up bucket {bucket}: {e}")84@pytest.fixture85def s3_multipart_upload(s3_client):86 def perform_multipart_upload(bucket, key, data=None, zipped=False, acl=None):87 kwargs = {"ACL": acl} if acl else {}88 multipart_upload_dict = s3_client.create_multipart_upload(Bucket=bucket, Key=key, **kwargs)89 upload_id = multipart_upload_dict["UploadId"]90 # Write contents to memory rather than a file.91 data = data or (5 * short_uid())92 data = to_bytes(data)93 upload_file_object = BytesIO(data)94 if zipped:95 upload_file_object = BytesIO()96 with gzip.GzipFile(fileobj=upload_file_object, mode="w") as filestream:97 filestream.write(data)98 response = s3_client.upload_part(99 Bucket=bucket,100 Key=key,101 Body=upload_file_object,102 PartNumber=1,103 UploadId=upload_id,104 )105 multipart_upload_parts = [{"ETag": response["ETag"], "PartNumber": 1}]106 return s3_client.complete_multipart_upload(107 Bucket=bucket,108 Key=key,109 MultipartUpload={"Parts": multipart_upload_parts},110 UploadId=upload_id,111 )112 return perform_multipart_upload113@pytest.fixture114def create_tmp_folder_lambda():115 cleanup_folders = []116 def prepare_folder(path_to_lambda, run_command=None):117 tmp_dir = tempfile.mkdtemp()118 shutil.copy(path_to_lambda, tmp_dir)119 if run_command:120 run(f"cd {tmp_dir}; {run_command}")121 cleanup_folders.append(tmp_dir)122 return tmp_dir123 yield prepare_folder124 for folder in cleanup_folders:125 try:126 shutil.rmtree(folder)127 except Exception:128 LOG.warning(f"could not delete folder {folder}")129class TestS3:130 @pytest.mark.aws_validated131 @pytest.mark.skip_snapshot_verify(paths=["$..EncodingType"])132 def test_region_header_exists(self, s3_client, s3_create_bucket, snapshot):133 snapshot.add_transformer(snapshot.transform.s3_api())134 bucket_name = s3_create_bucket(135 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},136 )137 response = s3_client.head_bucket(Bucket=bucket_name)138 assert response["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] == "eu-west-1"139 snapshot.match("head_bucket", response)140 response = s3_client.list_objects_v2(Bucket=bucket_name)141 assert response["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] == "eu-west-1"142 snapshot.match("list_objects_v2", response)143 @pytest.mark.aws_validated144 # TODO list-buckets contains other buckets when running in CI145 @pytest.mark.skip_snapshot_verify(146 paths=["$..Marker", "$..Prefix", "$..EncodingType", "$..list-buckets.Buckets"]147 )148 def test_delete_bucket_with_content(self, s3_client, s3_resource, s3_bucket, snapshot):149 snapshot.add_transformer(snapshot.transform.s3_api())150 bucket_name = s3_bucket151 for i in range(0, 10, 1):152 body = "test-" + str(i)153 key = "test-key-" + str(i)154 s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)155 resp = s3_client.list_objects(Bucket=bucket_name, MaxKeys=100)156 snapshot.match("list-objects", resp)157 assert 10 == len(resp["Contents"])158 bucket = s3_resource.Bucket(bucket_name)159 bucket.objects.all().delete()160 bucket.delete()161 resp = s3_client.list_buckets()162 # TODO - this fails in the CI pipeline and is currently skipped from verification163 snapshot.match("list-buckets", resp)164 assert bucket_name not in [b["Name"] for b in resp["Buckets"]]165 @pytest.mark.aws_validated166 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])167 def test_put_and_get_object_with_utf8_key(self, s3_client, s3_bucket, snapshot):168 snapshot.add_transformer(snapshot.transform.s3_api())169 response = s3_client.put_object(Bucket=s3_bucket, Key="Ā0Ä", Body=b"abc123")170 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200171 snapshot.match("put-object", response)172 response = s3_client.get_object(Bucket=s3_bucket, Key="Ā0Ä")173 snapshot.match("get-object", response)174 assert response["Body"].read() == b"abc123"175 @pytest.mark.aws_validated176 def test_resource_object_with_slashes_in_key(self, s3_resource, s3_bucket):177 s3_resource.Object(s3_bucket, "/foo").put(Body="foobar")178 s3_resource.Object(s3_bucket, "bar").put(Body="barfoo")179 with pytest.raises(ClientError) as e:180 s3_resource.Object(s3_bucket, "foo").get()181 e.match("NoSuchKey")182 with pytest.raises(ClientError) as e:183 s3_resource.Object(s3_bucket, "/bar").get()184 e.match("NoSuchKey")185 response = s3_resource.Object(s3_bucket, "/foo").get()186 assert response["Body"].read() == b"foobar"187 response = s3_resource.Object(s3_bucket, "bar").get()188 assert response["Body"].read() == b"barfoo"189 @pytest.mark.aws_validated190 def test_metadata_header_character_decoding(self, s3_client, s3_bucket, snapshot):191 snapshot.add_transformer(snapshot.transform.s3_api())192 # Object metadata keys should accept keys with underscores193 # https://github.com/localstack/localstack/issues/1790194 # put object195 object_key = "key-with-metadata"196 metadata = {"TEST_META_1": "foo", "__meta_2": "bar"}197 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Metadata=metadata, Body="foo")198 metadata_saved = s3_client.head_object(Bucket=s3_bucket, Key=object_key)["Metadata"]199 snapshot.match("head-object", metadata_saved)200 # note that casing is removed (since headers are case-insensitive)201 assert metadata_saved == {"test_meta_1": "foo", "__meta_2": "bar"}202 @pytest.mark.aws_validated203 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])204 def test_upload_file_multipart(self, s3_client, s3_bucket, tmpdir, snapshot):205 snapshot.add_transformer(snapshot.transform.s3_api())206 key = "my-key"207 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#multipart-transfers208 tranfer_config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)209 file = tmpdir / "test-file.bin"210 data = b"1" * (6 * KB) # create 6 kilobytes of ones211 file.write(data=data, mode="w")212 s3_client.upload_file(213 Bucket=s3_bucket, Key=key, Filename=str(file.realpath()), Config=tranfer_config214 )215 obj = s3_client.get_object(Bucket=s3_bucket, Key=key)216 assert obj["Body"].read() == data, f"body did not contain expected data {obj}"217 snapshot.match("get_object", obj)218 @pytest.mark.aws_validated219 @pytest.mark.parametrize("delimiter", ["/", "%2F"])220 def test_list_objects_with_prefix(self, s3_client, s3_create_bucket, delimiter):221 bucket_name = s3_create_bucket()222 key = "test/foo/bar/123"223 s3_client.put_object(Bucket=bucket_name, Key=key, Body=b"content 123")224 response = s3_client.list_objects(225 Bucket=bucket_name, Prefix="test/", Delimiter=delimiter, MaxKeys=1, EncodingType="url"226 )227 sub_dict = {228 "Delimiter": delimiter,229 "EncodingType": "url",230 "IsTruncated": False,231 "Marker": "",232 "MaxKeys": 1,233 "Name": bucket_name,234 "Prefix": "test/",235 }236 if delimiter == "/":237 # if delimiter is "/", then common prefixes are returned238 sub_dict["CommonPrefixes"] = [{"Prefix": "test/foo/"}]239 else:240 # if delimiter is "%2F" (or other non-contained character), then the actual keys are returned in Contents241 assert len(response["Contents"]) == 1242 assert response["Contents"][0]["Key"] == key243 sub_dict["Delimiter"] = "%252F"244 assert is_sub_dict(sub_dict, response)245 @pytest.mark.aws_validated246 @pytest.mark.skip_snapshot_verify(path="$..Error.BucketName")247 def test_get_object_no_such_bucket(self, s3_client, snapshot):248 with pytest.raises(ClientError) as e:249 s3_client.get_object(Bucket=f"does-not-exist-{short_uid()}", Key="foobar")250 snapshot.match("expected_error", e.value.response)251 @pytest.mark.aws_validated252 @pytest.mark.skip_snapshot_verify(path="$..RequestID")253 def test_delete_bucket_no_such_bucket(self, s3_client, snapshot):254 with pytest.raises(ClientError) as e:255 s3_client.delete_bucket(Bucket=f"does-not-exist-{short_uid()}")256 snapshot.match("expected_error", e.value.response)257 @pytest.mark.aws_validated258 @pytest.mark.skip_snapshot_verify(path="$..Error.BucketName")259 def test_get_bucket_notification_configuration_no_such_bucket(self, s3_client, snapshot):260 with pytest.raises(ClientError) as e:261 s3_client.get_bucket_notification_configuration(Bucket=f"doesnotexist-{short_uid()}")262 snapshot.match("expected_error", e.value.response)263 @pytest.mark.aws_validated264 @pytest.mark.xfail(265 reason="currently not implemented in moto, see https://github.com/localstack/localstack/issues/6217"266 )267 # TODO: see also XML issue in https://github.com/localstack/localstack/issues/6422268 def test_get_object_attributes(self, s3_client, s3_bucket, snapshot):269 s3_client.put_object(Bucket=s3_bucket, Key="data.txt", Body=b"69\n420\n")270 response = s3_client.get_object_attributes(271 Bucket=s3_bucket,272 Key="data.txt",273 ObjectAttributes=["StorageClass", "ETag", "ObjectSize"],274 )275 snapshot.match("object-attrs", response)276 @pytest.mark.aws_validated277 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])278 def test_put_and_get_object_with_hash_prefix(self, s3_client, s3_bucket, snapshot):279 snapshot.add_transformer(snapshot.transform.s3_api())280 key_name = "#key-with-hash-prefix"281 content = b"test 123"282 response = s3_client.put_object(Bucket=s3_bucket, Key=key_name, Body=content)283 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200284 snapshot.match("put-object", response)285 response = s3_client.get_object(Bucket=s3_bucket, Key=key_name)286 snapshot.match("get-object", response)287 assert response["Body"].read() == content288 @pytest.mark.aws_validated289 @pytest.mark.xfail(reason="error message is different in current implementation")290 def test_invalid_range_error(self, s3_client, s3_bucket):291 key = "my-key"292 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")293 with pytest.raises(ClientError) as e:294 s3_client.get_object(Bucket=s3_bucket, Key=key, Range="bytes=1024-4096")295 e.match("InvalidRange")296 e.match("The requested range is not satisfiable")297 @pytest.mark.aws_validated298 def test_range_key_not_exists(self, s3_client, s3_bucket):299 key = "my-key"300 with pytest.raises(ClientError) as e:301 s3_client.get_object(Bucket=s3_bucket, Key=key, Range="bytes=1024-4096")302 e.match("NoSuchKey")303 e.match("The specified key does not exist.")304 @pytest.mark.aws_validated305 def test_create_bucket_via_host_name(self, s3_vhost_client):306 # TODO check redirection (happens in AWS because of region name), should it happen in LS?307 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility308 bucket_name = f"test-{short_uid()}"309 try:310 response = s3_vhost_client.create_bucket(311 Bucket=bucket_name,312 CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},313 )314 assert "Location" in response315 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200316 response = s3_vhost_client.get_bucket_location(Bucket=bucket_name)317 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200318 assert response["LocationConstraint"] == "eu-central-1"319 finally:320 s3_vhost_client.delete_bucket(Bucket=bucket_name)321 @pytest.mark.aws_validated322 def test_put_and_get_bucket_policy(self, s3_client, s3_bucket):323 # put bucket policy324 policy = {325 "Version": "2012-10-17",326 "Statement": [327 {328 "Action": "s3:GetObject",329 "Effect": "Allow",330 "Resource": f"arn:aws:s3:::{s3_bucket}/*",331 "Principal": {"AWS": "*"},332 }333 ],334 }335 response = s3_client.put_bucket_policy(Bucket=s3_bucket, Policy=json.dumps(policy))336 assert response["ResponseMetadata"]["HTTPStatusCode"] == 204337 # retrieve and check policy config338 saved_policy = s3_client.get_bucket_policy(Bucket=s3_bucket)["Policy"]339 assert policy == json.loads(saved_policy)340 @pytest.mark.aws_validated341 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/5769")342 def test_put_object_tagging_empty_list(self, s3_client, s3_bucket, snapshot):343 key = "my-key"344 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")345 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)346 snapshot.match("created-object-tags", object_tags)347 tag_set = {"TagSet": [{"Key": "tag1", "Value": "tag1"}]}348 s3_client.put_object_tagging(Bucket=s3_bucket, Key=key, Tagging=tag_set)349 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)350 snapshot.match("updated-object-tags", object_tags)351 s3_client.put_object_tagging(Bucket=s3_bucket, Key=key, Tagging={"TagSet": []})352 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)353 snapshot.match("deleted-object-tags", object_tags)354 @pytest.mark.aws_validated355 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/6218")356 def test_head_object_fields(self, s3_client, s3_bucket, snapshot):357 key = "my-key"358 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")359 response = s3_client.head_object(Bucket=s3_bucket, Key=key)360 # missing AcceptRanges field361 # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html362 # https://stackoverflow.com/questions/58541696/s3-not-returning-accept-ranges-header363 # https://www.keycdn.com/support/frequently-asked-questions#is-byte-range-not-working-in-combination-with-s3364 snapshot.match("head-object", response)365 @pytest.mark.aws_validated366 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/6553")367 def test_get_object_after_deleted_in_versioned_bucket(368 self, s3_client, s3_bucket, s3_resource, snapshot369 ):370 bucket = s3_resource.Bucket(s3_bucket)371 bucket.Versioning().enable()372 key = "my-key"373 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")374 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=key)375 snapshot.match("get-object", s3_obj)376 s3_client.delete_object(Bucket=s3_bucket, Key=key)377 with pytest.raises(ClientError) as e:378 s3_client.get_object(Bucket=s3_bucket, Key=key)379 snapshot.match("get-object-after-delete", e.value.response)380 @pytest.mark.aws_validated381 @pytest.mark.parametrize("algorithm", ["CRC32", "CRC32C", "SHA1", "SHA256"])382 def test_put_object_checksum(self, s3_client, s3_create_bucket, algorithm):383 bucket = s3_create_bucket()384 key = f"file-{short_uid()}"385 data = b"test data.."386 params = {387 "Bucket": bucket,388 "Key": key,389 "Body": data,390 "ChecksumAlgorithm": algorithm,391 f"Checksum{algorithm}": short_uid(),392 }393 with pytest.raises(ClientError) as e:394 s3_client.put_object(**params)395 error = e.value.response["Error"]396 assert error["Code"] == "InvalidRequest"397 checksum_header = f"x-amz-checksum-{algorithm.lower()}"398 assert error["Message"] == f"Value for {checksum_header} header is invalid."399 # Test our generated checksums400 match algorithm:401 case "CRC32":402 checksum = checksum_crc32(data)403 case "CRC32C":404 checksum = checksum_crc32c(data)405 case "SHA1":406 checksum = hash_sha1(data)407 case "SHA256":408 checksum = hash_sha256(data)409 case _:410 checksum = ""411 params.update({f"Checksum{algorithm}": checksum})412 response = s3_client.put_object(**params)413 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414 # Test the autogenerated checksums415 params.pop(f"Checksum{algorithm}")416 response = s3_client.put_object(**params)417 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200418 @pytest.mark.aws_validated419 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])420 def test_s3_copy_metadata_replace(self, s3_client, s3_create_bucket, snapshot):421 snapshot.add_transformer(snapshot.transform.s3_api())422 object_key = "source-object"423 bucket_name = s3_create_bucket()424 resp = s3_client.put_object(425 Bucket=bucket_name,426 Key=object_key,427 Body='{"key": "value"}',428 ContentType="application/json",429 Metadata={"key": "value"},430 )431 snapshot.match("put_object", resp)432 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key)433 snapshot.match("head_object", head_object)434 object_key_copy = f"{object_key}-copy"435 resp = s3_client.copy_object(436 Bucket=bucket_name,437 CopySource=f"{bucket_name}/{object_key}",438 Key=object_key_copy,439 Metadata={"another-key": "value"},440 ContentType="application/javascript",441 MetadataDirective="REPLACE",442 )443 snapshot.match("copy_object", resp)444 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)445 snapshot.match("head_object_copy", head_object)446 @pytest.mark.aws_validated447 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])448 def test_s3_copy_content_type_and_metadata(self, s3_client, s3_create_bucket, snapshot):449 snapshot.add_transformer(snapshot.transform.s3_api())450 object_key = "source-object"451 bucket_name = s3_create_bucket()452 resp = s3_client.put_object(453 Bucket=bucket_name,454 Key=object_key,455 Body='{"key": "value"}',456 ContentType="application/json",457 Metadata={"key": "value"},458 )459 snapshot.match("put_object", resp)460 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key)461 snapshot.match("head_object", head_object)462 object_key_copy = f"{object_key}-copy"463 resp = s3_client.copy_object(464 Bucket=bucket_name, CopySource=f"{bucket_name}/{object_key}", Key=object_key_copy465 )466 snapshot.match("copy_object", resp)467 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)468 snapshot.match("head_object_copy", head_object)469 s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": object_key_copy}]})470 # does not set MetadataDirective=REPLACE, so the original metadata should be kept471 object_key_copy = f"{object_key}-second-copy"472 resp = s3_client.copy_object(473 Bucket=bucket_name,474 CopySource=f"{bucket_name}/{object_key}",475 Key=object_key_copy,476 Metadata={"another-key": "value"},477 ContentType="application/javascript",478 )479 snapshot.match("copy_object_second", resp)480 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)481 snapshot.match("head_object_second_copy", head_object)482 @pytest.mark.aws_validated483 @pytest.mark.xfail(484 reason="wrong behaviour, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/managing-acls.html"485 )486 def test_s3_multipart_upload_acls(487 self, s3_client, s3_create_bucket, s3_multipart_upload, snapshot488 ):489 # The basis for this test is wrong - see:490 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/managing-acls.html491 # > Bucket and object permissions are independent of each other. An object does not inherit the permissions492 # > from its bucket. For example, if you create a bucket and grant write access to a user, you can't access493 # > that user’s objects unless the user explicitly grants you access.494 snapshot.add_transformer(495 [496 snapshot.transform.key_value("DisplayName"),497 snapshot.transform.key_value("ID", value_replacement="owner-id"),498 ]499 )500 bucket_name = f"test-bucket-{short_uid()}"501 s3_create_bucket(Bucket=bucket_name, ACL="public-read")502 response = s3_client.get_bucket_acl(Bucket=bucket_name)503 snapshot.match("bucket-acl", response)504 def check_permissions(key):505 acl_response = s3_client.get_object_acl(Bucket=bucket_name, Key=key)506 snapshot.match(f"permission-{key}", acl_response)507 # perform uploads (multipart and regular) and check ACLs508 s3_client.put_object(Bucket=bucket_name, Key="acl-key0", Body="something")509 check_permissions("acl-key0")510 s3_multipart_upload(bucket=bucket_name, key="acl-key1")511 check_permissions("acl-key1")512 s3_multipart_upload(bucket=bucket_name, key="acl-key2", acl="public-read-write")513 check_permissions("acl-key2")514 @pytest.mark.only_localstack515 @pytest.mark.parametrize("case_sensitive_headers", [True, False])516 def test_s3_get_response_case_sensitive_headers(517 self, s3_client, s3_bucket, case_sensitive_headers518 ):519 # Test that RETURN_CASE_SENSITIVE_HEADERS is respected520 object_key = "key-by-hostname"521 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")522 # get object and assert headers523 case_sensitive_before = http2_server.RETURN_CASE_SENSITIVE_HEADERS524 try:525 url = s3_client.generate_presigned_url(526 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}527 )528 http2_server.RETURN_CASE_SENSITIVE_HEADERS = case_sensitive_headers529 response = requests.get(url, verify=False)530 # expect that Etag is contained531 header_names = list(response.headers.keys())532 expected_etag = "ETag" if case_sensitive_headers else "etag"533 assert expected_etag in header_names534 finally:535 http2_server.RETURN_CASE_SENSITIVE_HEADERS = case_sensitive_before536 @pytest.mark.aws_validated537 @pytest.mark.skip_snapshot_verify(538 paths=[539 "$..AcceptRanges",540 "$..ContentLanguage",541 "$..VersionId",542 "$..Restore",543 ]544 )545 def test_s3_object_expiry(self, s3_client, s3_bucket, snapshot):546 # AWS only cleans up S3 expired object once a day usually547 # the object stays accessible for quite a while after being expired548 # https://stackoverflow.com/questions/38851456/aws-s3-object-expiration-less-than-24-hours549 # handle s3 object expiry550 # https://github.com/localstack/localstack/issues/1685551 # TODO: should we have a config var to not deleted immediately in the new provider? and schedule it?552 snapshot.add_transformer(snapshot.transform.s3_api())553 # put object554 short_expire = datetime.datetime.now(timezone("GMT")) + datetime.timedelta(seconds=1)555 object_key_expired = "key-object-expired"556 object_key_not_expired = "key-object-not-expired"557 s3_client.put_object(558 Bucket=s3_bucket,559 Key=object_key_expired,560 Body="foo",561 Expires=short_expire,562 )563 # sleep so it expires564 time.sleep(3)565 # head_object does not raise an error for now in LS566 response = s3_client.head_object(Bucket=s3_bucket, Key=object_key_expired)567 assert response["Expires"] < datetime.datetime.now(timezone("GMT"))568 snapshot.match("head-object-expired", response)569 # try to fetch an object which is already expired570 if not is_aws_cloud(): # fixme for now behaviour differs, have a look at it and discuss571 with pytest.raises(Exception) as e: # this does not raise in AWS572 s3_client.get_object(Bucket=s3_bucket, Key=object_key_expired)573 e.match("NoSuchKey")574 s3_client.put_object(575 Bucket=s3_bucket,576 Key=object_key_not_expired,577 Body="foo",578 Expires=datetime.datetime.now(timezone("GMT")) + datetime.timedelta(hours=1),579 )580 # try to fetch has not been expired yet.581 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key_not_expired)582 assert "Expires" in resp583 assert resp["Expires"] > datetime.datetime.now(timezone("GMT"))584 snapshot.match("get-object-not-yet-expired", resp)585 @pytest.mark.aws_validated586 @pytest.mark.skip_snapshot_verify(587 paths=[588 "$..ContentLanguage",589 "$..VersionId",590 ]591 )592 def test_upload_file_with_xml_preamble(self, s3_client, s3_create_bucket, snapshot):593 snapshot.add_transformer(snapshot.transform.s3_api())594 bucket_name = f"bucket-{short_uid()}"595 object_key = f"key-{short_uid()}"596 body = '<?xml version="1.0" encoding="UTF-8"?><test/>'597 s3_create_bucket(Bucket=bucket_name)598 s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=body)599 response = s3_client.get_object(Bucket=bucket_name, Key=object_key)600 snapshot.match("get_object", response)601 @pytest.mark.aws_validated602 @pytest.mark.xfail(reason="The error format is wrong in s3_listener (is_bucket_available)")603 def test_bucket_availability(self, s3_client, snapshot):604 bucket_name = "test-bucket-lifecycle"605 with pytest.raises(ClientError) as e:606 s3_client.get_bucket_lifecycle(Bucket=bucket_name)607 snapshot.match("bucket-lifecycle", e.value.response)608 with pytest.raises(ClientError) as e:609 s3_client.get_bucket_replication(Bucket=bucket_name)610 snapshot.match("bucket-replication", e.value.response)611 @pytest.mark.aws_validated612 def test_location_path_url(self, s3_client, s3_create_bucket, account_id):613 region = "us-east-2"614 bucket_name = s3_create_bucket(615 CreateBucketConfiguration={"LocationConstraint": region}, ACL="public-read"616 )617 response = s3_client.get_bucket_location(Bucket=bucket_name)618 assert region == response["LocationConstraint"]619 url = _bucket_url(bucket_name, region)620 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html621 # make raw request, assert that newline is contained after XML preamble: <?xml ...>\n622 response = requests.get(f"{url}?location?x-amz-expected-bucket-owner={account_id}")623 assert response.ok624 content = to_str(response.content)625 assert re.match(r"^<\?xml [^>]+>\n<.*", content, flags=re.MULTILINE)626 @pytest.mark.aws_validated627 @pytest.mark.skip_snapshot_verify(paths=["$..Error.RequestID"])628 def test_different_location_constraint(629 self,630 s3_client,631 s3_create_bucket,632 s3_client_for_region,633 s3_create_bucket_with_client,634 snapshot,635 ):636 snapshot.add_transformer(snapshot.transform.s3_api())637 snapshot.add_transformer(638 snapshot.transform.key_value("Location", "<location>", reference_replacement=False)639 )640 bucket_1_name = f"bucket-{short_uid()}"641 s3_create_bucket(Bucket=bucket_1_name)642 response = s3_client.get_bucket_location(Bucket=bucket_1_name)643 snapshot.match("get_bucket_location_bucket_1", response)644 region_2 = "us-east-2"645 client_2 = s3_client_for_region(region_name=region_2)646 bucket_2_name = f"bucket-{short_uid()}"647 s3_create_bucket_with_client(648 client_2,649 Bucket=bucket_2_name,650 CreateBucketConfiguration={"LocationConstraint": region_2},651 )652 response = client_2.get_bucket_location(Bucket=bucket_2_name)653 snapshot.match("get_bucket_location_bucket_2", response)654 # assert creation fails without location constraint for us-east-2 region655 with pytest.raises(Exception) as exc:656 client_2.create_bucket(Bucket=f"bucket-{short_uid()}")657 snapshot.match("create_bucket_constraint_exc", exc.value.response)658 bucket_3_name = f"bucket-{short_uid()}"659 response = s3_create_bucket_with_client(660 client_2,661 Bucket=bucket_3_name,662 CreateBucketConfiguration={"LocationConstraint": region_2},663 )664 snapshot.match("create_bucket_bucket_3", response)665 response = client_2.get_bucket_location(Bucket=bucket_3_name)666 snapshot.match("get_bucket_location_bucket_3", response)667 @pytest.mark.aws_validated668 @pytest.mark.skip_snapshot_verify(669 paths=[670 "$..ContentLanguage",671 "$..VersionId",672 ]673 )674 def test_get_object_with_anon_credentials(self, s3_client, s3_create_bucket, snapshot):675 snapshot.add_transformer(snapshot.transform.s3_api())676 bucket_name = f"bucket-{short_uid()}"677 object_key = f"key-{short_uid()}"678 body = "body data"679 s3_create_bucket(Bucket=bucket_name, ACL="public-read")680 s3_client.put_object(681 Bucket=bucket_name,682 Key=object_key,683 Body=body,684 )685 s3_client.put_object_acl(Bucket=bucket_name, Key=object_key, ACL="public-read")686 s3_anon_client = _anon_client("s3")687 response = s3_anon_client.get_object(Bucket=bucket_name, Key=object_key)688 snapshot.match("get_object", response)689 @pytest.mark.aws_validated690 @pytest.mark.skip_snapshot_verify(691 paths=["$..ContentLanguage", "$..VersionId", "$..AcceptRanges"]692 )693 def test_putobject_with_multiple_keys(self, s3_client, s3_create_bucket, snapshot):694 snapshot.add_transformer(snapshot.transform.s3_api())695 bucket = f"bucket-{short_uid()}"696 key_by_path = "aws/key1/key2/key3"697 s3_create_bucket(Bucket=bucket)698 s3_client.put_object(Body=b"test", Bucket=bucket, Key=key_by_path)699 result = s3_client.get_object(Bucket=bucket, Key=key_by_path)700 snapshot.match("get_object", result)701 @pytest.mark.aws_validated702 def test_delete_bucket_lifecycle_configuration(self, s3_client, s3_bucket, snapshot):703 snapshot.add_transformer(snapshot.transform.key_value("BucketName"))704 lfc = {705 "Rules": [706 {707 "Expiration": {"Days": 7},708 "ID": "wholebucket",709 "Filter": {"Prefix": ""},710 "Status": "Enabled",711 }712 ]713 }714 s3_client.put_bucket_lifecycle_configuration(Bucket=s3_bucket, LifecycleConfiguration=lfc)715 result = s3_client.get_bucket_lifecycle_configuration(Bucket=s3_bucket)716 snapshot.match("get-bucket-lifecycle-conf", result)717 s3_client.delete_bucket_lifecycle(Bucket=s3_bucket)718 with pytest.raises(ClientError) as e:719 s3_client.get_bucket_lifecycle_configuration(Bucket=s3_bucket)720 snapshot.match("get-bucket-lifecycle-exc", e.value.response)721 @pytest.mark.aws_validated722 def test_delete_lifecycle_configuration_on_bucket_deletion(723 self, s3_client, s3_create_bucket, snapshot724 ):725 snapshot.add_transformer(snapshot.transform.key_value("BucketName"))726 bucket_name = f"test-bucket-{short_uid()}" # keep the same name for both bucket727 s3_create_bucket(Bucket=bucket_name)728 lfc = {729 "Rules": [730 {731 "Expiration": {"Days": 7},732 "ID": "wholebucket",733 "Filter": {"Prefix": ""},734 "Status": "Enabled",735 }736 ]737 }738 s3_client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lfc)739 result = s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)740 snapshot.match("get-bucket-lifecycle-conf", result)741 s3_client.delete_bucket(Bucket=bucket_name)742 s3_create_bucket(Bucket=bucket_name)743 with pytest.raises(ClientError) as e:744 s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)745 snapshot.match("get-bucket-lifecycle-exc", e.value.response)746 @pytest.mark.aws_validated747 @pytest.mark.skip_snapshot_verify(748 paths=[749 "$..ContentLanguage",750 "$..VersionId",751 "$..ETag", # TODO ETag should be the same?752 ]753 )754 def test_range_header_body_length(self, s3_client, s3_bucket, snapshot):755 # Test for https://github.com/localstack/localstack/issues/1952756 object_key = "sample.bin"757 chunk_size = 1024758 with io.BytesIO() as data:759 data.write(os.urandom(chunk_size * 2))760 data.seek(0)761 s3_client.upload_fileobj(data, s3_bucket, object_key)762 range_header = f"bytes=0-{(chunk_size - 1)}"763 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key, Range=range_header)764 content = resp["Body"].read()765 assert chunk_size == len(content)766 snapshot.match("get-object", resp)767 @pytest.mark.aws_validated768 def test_get_range_object_headers(self, s3_client, s3_bucket):769 object_key = "sample.bin"770 chunk_size = 1024771 with io.BytesIO() as data:772 data.write(os.urandom(chunk_size * 2))773 data.seek(0)774 s3_client.upload_fileobj(data, s3_bucket, object_key)775 range_header = f"bytes=0-{(chunk_size - 1)}"776 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key, Range=range_header)777 assert resp.get("AcceptRanges") == "bytes"778 resp_headers = resp["ResponseMetadata"]["HTTPHeaders"]779 assert "x-amz-request-id" in resp_headers780 assert "x-amz-id-2" in resp_headers781 # `content-language` should not be in the response782 if is_aws_cloud(): # fixme parity issue783 assert "content-language" not in resp_headers784 # We used to return `cache-control: no-cache` if the header wasn't set785 # by the client, but this was a bug because s3 doesn't do that. It simply786 # omits it.787 assert "cache-control" not in resp_headers788 # Do not send a content-encoding header as discussed in Issue #3608789 assert "content-encoding" not in resp_headers790 @pytest.mark.only_localstack791 def test_put_object_chunked_newlines(self, s3_client, s3_bucket):792 # Boto still does not support chunk encoding, which means we can't test with the client nor793 # aws_http_client_factory. See open issue: https://github.com/boto/boto3/issues/751794 # Test for https://github.com/localstack/localstack/issues/1571795 object_key = "data"796 body = "Hello\r\n\r\n\r\n\r\n"797 headers = {798 "Authorization": aws_stack.mock_aws_request_headers("s3")["Authorization"],799 "Content-Type": "audio/mpeg",800 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",801 "X-Amz-Date": "20190918T051509Z",802 "X-Amz-Decoded-Content-Length": str(len(body)),803 }804 data = (805 "d;chunk-signature=af5e6c0a698b0192e9aa5d9083553d4d241d81f69ec62b184d05c509ad5166af\r\n"806 f"{body}\r\n0;chunk-signature=f2a50a8c0ad4d212b579c2489c6d122db88d8a0d0b987ea1f3e9d081074a5937\r\n"807 )808 # put object809 url = f"{config.service_url('s3')}/{s3_bucket}/{object_key}"810 requests.put(url, data, headers=headers, verify=False)811 # get object and assert content length812 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=object_key)813 download_file_object = to_str(downloaded_object["Body"].read())814 assert len(body) == len(str(download_file_object))815 assert body == str(download_file_object)816 @pytest.mark.only_localstack817 def test_put_object_with_md5_and_chunk_signature(self, s3_client, s3_bucket):818 # Boto still does not support chunk encoding, which means we can't test with the client nor819 # aws_http_client_factory. See open issue: https://github.com/boto/boto3/issues/751820 # Test for https://github.com/localstack/localstack/issues/4987821 object_key = "test-runtime.properties"822 object_data = (823 "#20211122+0100\n"824 "#Mon Nov 22 20:10:44 CET 2021\n"825 "last.sync.url.test-space-key=2822a50f-4992-425a-b8fb-923735a9ddff317e3479-5907-46cf-b33a-60da9709274f\n"826 )827 object_data_chunked = (828 "93;chunk-signature=5be6b2d473e96bb9f297444da60bdf0ff8f5d2e211e1d551b3cf3646c0946641\r\n"829 f"{object_data}"830 "\r\n0;chunk-signature=bd5c830b94346b57ddc8805ba26c44a122256c207014433bf6579b0985f21df7\r\n\r\n"831 )832 content_md5 = base64.b64encode(hashlib.md5(object_data.encode()).digest()).decode()833 headers = {834 "Content-Md5": content_md5,835 "Content-Type": "application/octet-stream",836 "User-Agent": (837 "aws-sdk-java/1.11.951 Mac_OS_X/10.15.7 OpenJDK_64-Bit_Server_VM/11.0.11+9-LTS "838 "java/11.0.11 scala/2.13.6 kotlin/1.5.31 vendor/Amazon.com_Inc."839 ),840 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",841 "X-Amz-Date": "20211122T191045Z",842 "X-Amz-Decoded-Content-Length": str(len(object_data)),843 "Content-Length": str(len(object_data_chunked)),844 "Connection": "Keep-Alive",845 "Expect": "100-continue",846 }847 url = s3_client.generate_presigned_url(848 "put_object",849 Params={850 "Bucket": s3_bucket,851 "Key": object_key,852 "ContentType": "application/octet-stream",853 "ContentMD5": content_md5,854 },855 )856 result = requests.put(url, data=object_data_chunked, headers=headers)857 assert result.status_code == 200, (result, result.content)858 @pytest.mark.aws_validated859 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])860 def test_delete_object_tagging(self, s3_client, s3_bucket, snapshot):861 object_key = "test-key-tagging"862 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")863 # get object and assert response864 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=object_key)865 snapshot.match("get-obj", s3_obj)866 # delete object tagging867 s3_client.delete_object_tagging(Bucket=s3_bucket, Key=object_key)868 # assert that the object still exists869 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=object_key)870 snapshot.match("get-obj-after-tag-deletion", s3_obj)871 @pytest.mark.aws_validated872 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])873 def test_delete_non_existing_keys(self, s3_client, s3_bucket, snapshot):874 object_key = "test-key-nonexistent"875 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")876 response = s3_client.delete_objects(877 Bucket=s3_bucket,878 Delete={"Objects": [{"Key": object_key}, {"Key": "dummy1"}, {"Key": "dummy2"}]},879 )880 response["Deleted"].sort(key=itemgetter("Key"))881 snapshot.match("deleted-resp", response)882 assert len(response["Deleted"]) == 3883 assert "Errors" not in response884 @pytest.mark.aws_validated885 @pytest.mark.skip_snapshot_verify(886 paths=["$..Error.RequestID"]887 ) # fixme RequestID not in AWS response888 def test_delete_non_existing_keys_in_non_existing_bucket(self, s3_client, snapshot):889 with pytest.raises(ClientError) as e:890 s3_client.delete_objects(891 Bucket="non-existent-bucket",892 Delete={"Objects": [{"Key": "dummy1"}, {"Key": "dummy2"}]},893 )894 assert "NoSuchBucket" == e.value.response["Error"]["Code"]895 snapshot.match("error-non-existent-bucket", e.value.response)896 @pytest.mark.aws_validated897 def test_s3_request_payer(self, s3_client, s3_bucket, snapshot):898 response = s3_client.put_bucket_request_payment(899 Bucket=s3_bucket, RequestPaymentConfiguration={"Payer": "Requester"}900 )901 snapshot.match("put-bucket-request-payment", response)902 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200903 response = s3_client.get_bucket_request_payment(Bucket=s3_bucket)904 snapshot.match("get-bucket-request-payment", response)905 assert "Requester" == response["Payer"]906 @pytest.mark.aws_validated907 @pytest.mark.skip_snapshot_verify(908 paths=["$..Error.RequestID", "$..Grants..Grantee.DisplayName"]909 )910 def test_bucket_exists(self, s3_client, s3_bucket, snapshot):911 snapshot.add_transformer(912 [913 snapshot.transform.key_value("DisplayName"),914 snapshot.transform.key_value("ID", value_replacement="owner-id"),915 ]916 )917 s3_client.put_bucket_cors(918 Bucket=s3_bucket,919 CORSConfiguration={920 "CORSRules": [921 {922 "AllowedMethods": ["GET", "POST", "PUT", "DELETE"],923 "AllowedOrigins": ["localhost"],924 }925 ]926 },927 )928 response = s3_client.get_bucket_cors(Bucket=s3_bucket)929 snapshot.match("get-bucket-cors", response)930 result = s3_client.get_bucket_acl(Bucket=s3_bucket)931 snapshot.match("get-bucket-acl", result)932 with pytest.raises(ClientError) as e:933 s3_client.get_bucket_acl(Bucket="bucket-not-exists")934 snapshot.match("get-bucket-not-exists", e.value.response)935 @pytest.mark.aws_validated936 @pytest.mark.skip_snapshot_verify(937 paths=["$..VersionId", "$..ContentLanguage", "$..Error.RequestID"]938 )939 def test_s3_uppercase_key_names(self, s3_client, s3_create_bucket, snapshot):940 # bucket name should be case-sensitive941 bucket_name = f"testuppercase-{short_uid()}"942 s3_create_bucket(Bucket=bucket_name)943 # key name should be case-sensitive944 object_key = "camelCaseKey"945 s3_client.put_object(Bucket=bucket_name, Key=object_key, Body="something")946 res = s3_client.get_object(Bucket=bucket_name, Key=object_key)947 snapshot.match("response", res)948 with pytest.raises(ClientError) as e:949 s3_client.get_object(Bucket=bucket_name, Key="camelcasekey")950 snapshot.match("wrong-case-key", e.value.response)951 @pytest.mark.aws_validated952 def test_s3_download_object_with_lambda(953 self,954 s3_client,955 s3_create_bucket,956 create_lambda_function,957 lambda_client,958 lambda_su_role,959 logs_client,960 ):961 bucket_name = f"bucket-{short_uid()}"962 function_name = f"func-{short_uid()}"963 key = f"key-{short_uid()}"964 s3_create_bucket(Bucket=bucket_name)965 s3_client.put_object(Bucket=bucket_name, Key=key, Body="something..")966 create_lambda_function(967 handler_file=os.path.join(968 os.path.dirname(__file__),969 "../awslambda",970 "functions",971 "lambda_triggered_by_sqs_download_s3_file.py",972 ),973 func_name=function_name,974 role=lambda_su_role,975 runtime=LAMBDA_RUNTIME_PYTHON39,976 envvars=dict(977 {978 "BUCKET_NAME": bucket_name,979 "OBJECT_NAME": key,980 "LOCAL_FILE_NAME": "/tmp/" + key,981 }982 ),983 )984 lambda_client.invoke(FunctionName=function_name, InvocationType="Event")985 # TODO maybe this check can be improved (do not rely on logs)986 retry(987 check_expected_lambda_log_events_length,988 retries=10,989 sleep=1,990 function_name=function_name,991 regex_filter="success",992 expected_length=1,993 logs_client=logs_client,994 )995 @pytest.mark.aws_validated996 # TODO LocalStack adds this RequestID to the error response997 @pytest.mark.skip_snapshot_verify(paths=["$..Error.RequestID"])998 def test_precondition_failed_error(self, s3_client, s3_create_bucket, snapshot):999 bucket = f"bucket-{short_uid()}"1000 s3_create_bucket(Bucket=bucket)1001 s3_client.put_object(Bucket=bucket, Key="foo", Body=b'{"foo": "bar"}')1002 with pytest.raises(ClientError) as e:1003 s3_client.get_object(Bucket=bucket, Key="foo", IfMatch='"not good etag"')1004 snapshot.match("get-object-if-match", e.value.response)1005 @pytest.mark.aws_validated1006 @pytest.mark.xfail(reason="Error format is wrong and missing keys")1007 def test_s3_invalid_content_md5(self, s3_client, s3_bucket, snapshot):1008 # put object with invalid content MD51009 hashes = ["__invalid__", "000", "not base64 encoded checksum", "MTIz"]1010 for index, md5hash in enumerate(hashes):1011 with pytest.raises(ClientError) as e:1012 s3_client.put_object(1013 Bucket=s3_bucket,1014 Key="test-key",1015 Body="something",1016 ContentMD5=md5hash,1017 )1018 snapshot.match(f"md5-error-{index}", e.value.response)1019 @pytest.mark.aws_validated1020 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage", "$..ETag"])1021 def test_s3_upload_download_gzip(self, s3_client, s3_bucket, snapshot):1022 data = "1234567890 " * 1001023 # Write contents to memory rather than a file.1024 upload_file_object = BytesIO()1025 with gzip.GzipFile(fileobj=upload_file_object, mode="w") as filestream:1026 filestream.write(data.encode("utf-8"))1027 # Upload gzip1028 response = s3_client.put_object(1029 Bucket=s3_bucket,1030 Key="test.gz",1031 ContentEncoding="gzip",1032 Body=upload_file_object.getvalue(),1033 )1034 snapshot.match("put-object", response)1035 # TODO: check why ETag is different1036 # Download gzip1037 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key="test.gz")1038 snapshot.match("get-object", downloaded_object)1039 download_file_object = BytesIO(downloaded_object["Body"].read())1040 with gzip.GzipFile(fileobj=download_file_object, mode="rb") as filestream:1041 downloaded_data = filestream.read().decode("utf-8")1042 assert downloaded_data == data1043 @pytest.mark.aws_validated1044 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])1045 def test_multipart_copy_object_etag(self, s3_client, s3_bucket, s3_multipart_upload, snapshot):1046 snapshot.add_transformer(1047 [1048 snapshot.transform.key_value("Location"),1049 snapshot.transform.key_value("Bucket"),1050 ]1051 )1052 key = "test.file"1053 copy_key = "copy.file"1054 src_object_path = f"{s3_bucket}/{key}"1055 content = "test content 123"1056 response = s3_multipart_upload(bucket=s3_bucket, key=key, data=content)1057 snapshot.match("multipart-upload", response)1058 multipart_etag = response["ETag"]1059 response = s3_client.copy_object(Bucket=s3_bucket, CopySource=src_object_path, Key=copy_key)1060 snapshot.match("copy-object", response)1061 copy_etag = response["CopyObjectResult"]["ETag"]1062 # etags should be different1063 assert copy_etag != multipart_etag1064 @pytest.mark.aws_validated1065 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])1066 def test_set_external_hostname(1067 self, s3_client, s3_bucket, s3_multipart_upload, monkeypatch, snapshot1068 ):1069 snapshot.add_transformer(1070 [1071 snapshot.transform.key_value("Location"),1072 snapshot.transform.key_value("Bucket"),1073 ]1074 )1075 monkeypatch.setattr(config, "HOSTNAME_EXTERNAL", "foobar")1076 key = "test.file"1077 content = "test content 123"1078 acl = "public-read"1079 # upload file1080 response = s3_multipart_upload(bucket=s3_bucket, key=key, data=content, acl=acl)1081 snapshot.match("multipart-upload", response)1082 if is_aws_cloud(): # TODO: default addressing is vhost for AWS1083 expected_url = f"{_bucket_url_vhost(bucket_name=s3_bucket)}/{key}"1084 else: # LS default is path addressing1085 expected_url = f"{_bucket_url(bucket_name=s3_bucket, localstack_host=config.HOSTNAME_EXTERNAL)}/{key}"1086 assert response["Location"] == expected_url1087 # download object via API1088 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=key)1089 snapshot.match("get-object", response)1090 assert content == to_str(downloaded_object["Body"].read())1091 # download object directly from download link1092 download_url = response["Location"].replace(f"{config.HOSTNAME_EXTERNAL}:", "localhost:")1093 response = requests.get(download_url)1094 assert response.status_code == 2001095 assert to_str(response.content) == content1096 @pytest.mark.skip_offline1097 @pytest.mark.aws_validated1098 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])1099 def test_s3_lambda_integration(1100 self,1101 lambda_client,1102 create_lambda_function,1103 lambda_su_role,1104 s3_client,1105 s3_create_bucket,1106 create_tmp_folder_lambda,1107 snapshot,1108 ):1109 snapshot.add_transformer(snapshot.transform.s3_api())1110 handler_file = os.path.join(1111 os.path.dirname(__file__), "../awslambda", "functions", "lambda_s3_integration.js"1112 )1113 temp_folder = create_tmp_folder_lambda(1114 handler_file,1115 run_command="npm i @aws-sdk/client-s3; npm i @aws-sdk/s3-request-presigner",1116 )1117 function_name = f"func-integration-{short_uid()}"1118 create_lambda_function(1119 func_name=function_name,1120 zip_file=testutil.create_zip_file(temp_folder, get_content=True),1121 runtime=LAMBDA_RUNTIME_NODEJS14X,1122 handler="lambda_s3_integration.handler",1123 role=lambda_su_role,1124 )1125 s3_create_bucket(Bucket=function_name)1126 response = lambda_client.invoke(FunctionName=function_name)1127 presigned_url = response["Payload"].read()1128 presigned_url = json.loads(to_str(presigned_url))["body"].strip('"')1129 response = requests.put(presigned_url, verify=False)1130 assert 200 == response.status_code1131 response = s3_client.head_object(Bucket=function_name, Key="key.png")1132 snapshot.match("head_object", response)1133class TestS3TerraformRawRequests:1134 @pytest.mark.only_localstack1135 def test_terraform_request_sequence(self):1136 reqs = load_file(os.path.join(os.path.dirname(__file__), "../files", "s3.requests.txt"))1137 reqs = reqs.split("---")1138 for req in reqs:1139 header, _, body = req.strip().partition("\n\n")1140 req, _, headers = header.strip().partition("\n")1141 headers = {h.split(":")[0]: h.partition(":")[2].strip() for h in headers.split("\n")}1142 method, path, _ = req.split(" ")1143 url = f"{config.get_edge_url()}{path}"1144 result = getattr(requests, method.lower())(url, data=body, headers=headers)1145 assert result.status_code < 4001146class TestS3PresignedUrl:1147 """1148 These tests pertain to S3's presigned URL feature.1149 """1150 @pytest.mark.aws_validated1151 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage", "$..Expires"])1152 def test_put_object(self, s3_client, s3_bucket, snapshot):1153 snapshot.add_transformer(snapshot.transform.s3_api())1154 key = "my-key"1155 url = s3_client.generate_presigned_url(1156 "put_object", Params={"Bucket": s3_bucket, "Key": key}1157 )1158 requests.put(url, data="something", verify=False)1159 response = s3_client.get_object(Bucket=s3_bucket, Key=key)1160 assert response["Body"].read() == b"something"1161 snapshot.match("get_object", response)1162 @pytest.mark.aws_validated1163 @pytest.mark.xfail(1164 condition=not config.LEGACY_EDGE_PROXY, reason="failing with new HTTP gateway (only in CI)"1165 )1166 def test_post_object_with_files(self, s3_client, s3_bucket):1167 object_key = "test-presigned-post-key"1168 body = b"something body"1169 presigned_request = s3_client.generate_presigned_post(1170 Bucket=s3_bucket, Key=object_key, ExpiresIn=601171 )1172 # put object1173 response = requests.post(1174 presigned_request["url"],1175 data=presigned_request["fields"],1176 files={"file": body},1177 verify=False,1178 )1179 assert response.status_code == 2041180 # get object and compare results1181 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=object_key)1182 assert downloaded_object["Body"].read() == body1183 @pytest.mark.aws_validated1184 def test_post_request_expires(self, s3_client, s3_bucket):1185 # presign a post with a short expiry time1186 object_key = "test-presigned-post-key"1187 presigned_request = s3_client.generate_presigned_post(1188 Bucket=s3_bucket, Key=object_key, ExpiresIn=21189 )1190 # sleep so it expires1191 time.sleep(3)1192 # attempt to use the presigned request1193 response = requests.post(1194 presigned_request["url"],1195 data=presigned_request["fields"],1196 files={"file": "file content"},1197 verify=False,1198 )1199 # FIXME: localstack returns 400 but aws returns 4031200 assert response.status_code in [400, 403]1201 assert "ExpiredToken" in response.text1202 @pytest.mark.aws_validated1203 def test_delete_has_empty_content_length_header(self, s3_client, s3_bucket):1204 for encoding in None, "gzip":1205 # put object1206 object_key = "key-by-hostname"1207 s3_client.put_object(1208 Bucket=s3_bucket,1209 Key=object_key,1210 Body="something",1211 ContentType="text/html; charset=utf-8",1212 )1213 url = s3_client.generate_presigned_url(1214 "delete_object", Params={"Bucket": s3_bucket, "Key": object_key}1215 )1216 # get object and assert headers1217 headers = {}1218 if encoding:1219 headers["Accept-Encoding"] = encoding1220 response = requests.delete(url, headers=headers, verify=False)1221 assert response.status_code == 2041222 assert not response.text1223 # AWS does not send a content-length header at all, legacy localstack sends a 0 length header1224 assert response.headers.get("content-length") in [1225 "0",1226 None,1227 ], f"Unexpected content-length in headers {response.headers}"1228 @pytest.mark.aws_validated1229 def test_head_has_correct_content_length_header(self, s3_client, s3_bucket):1230 body = "something body \n \n\r"1231 # put object1232 object_key = "key-by-hostname"1233 s3_client.put_object(1234 Bucket=s3_bucket,1235 Key=object_key,1236 Body=body,1237 ContentType="text/html; charset=utf-8",1238 )1239 url = s3_client.generate_presigned_url(1240 "head_object", Params={"Bucket": s3_bucket, "Key": object_key}1241 )1242 # get object and assert headers1243 response = requests.head(url, verify=False)1244 assert response.headers.get("content-length") == str(len(body))1245 @pytest.mark.aws_validated1246 @pytest.mark.skip_snapshot_verify(paths=["$..Expires", "$..AcceptRanges"])1247 def test_put_url_metadata(self, s3_client, s3_bucket, snapshot):1248 snapshot.add_transformer(snapshot.transform.s3_api())1249 # Object metadata should be passed as query params via presigned URL1250 # https://github.com/localstack/localstack/issues/5441251 metadata = {"foo": "bar"}1252 object_key = "key-by-hostname"1253 # put object via presigned URL1254 url = s3_client.generate_presigned_url(1255 "put_object",1256 Params={"Bucket": s3_bucket, "Key": object_key, "Metadata": metadata},1257 )1258 assert "x-amz-meta-foo=bar" in url1259 response = requests.put(url, data="content 123", verify=False)1260 assert response.ok, f"response returned {response.status_code}: {response.text}"1261 # response body should be empty, see https://github.com/localstack/localstack/issues/13171262 assert not response.text1263 # assert metadata is present1264 response = s3_client.head_object(Bucket=s3_bucket, Key=object_key)1265 assert response.get("Metadata", {}).get("foo") == "bar"1266 snapshot.match("head_object", response)1267 @pytest.mark.aws_validated1268 def test_get_object_ignores_request_body(self, s3_client, s3_bucket):1269 key = "foo-key"1270 body = "foobar"1271 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body)1272 url = s3_client.generate_presigned_url(1273 "get_object", Params={"Bucket": s3_bucket, "Key": key}1274 )1275 response = requests.get(url, data=b"get body is ignored by AWS")1276 assert response.status_code == 2001277 assert response.text == body1278 @pytest.mark.aws_validated1279 def test_put_object_with_md5_and_chunk_signature_bad_headers(1280 self,1281 s3_client,1282 s3_create_bucket,1283 ):1284 bucket_name = f"bucket-{short_uid()}"1285 object_key = "test-runtime.properties"1286 content_md5 = "pX8KKuGXS1f2VTcuJpqjkw=="1287 headers = {1288 "Content-Md5": content_md5,1289 "Content-Type": "application/octet-stream",1290 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",1291 "X-Amz-Date": "20211122T191045Z",1292 "X-Amz-Decoded-Content-Length": "test", # string instead of int1293 "Content-Length": "10",1294 "Connection": "Keep-Alive",1295 "Expect": "100-continue",1296 }1297 s3_create_bucket(Bucket=bucket_name)1298 url = s3_client.generate_presigned_url(1299 "put_object",1300 Params={1301 "Bucket": bucket_name,1302 "Key": object_key,1303 "ContentType": "application/octet-stream",1304 "ContentMD5": content_md5,1305 },1306 )1307 result = requests.put(url, data="test", headers=headers)1308 assert result.status_code == 4031309 assert b"SignatureDoesNotMatch" in result.content1310 # check also no X-Amz-Decoded-Content-Length1311 headers.pop("X-Amz-Decoded-Content-Length")1312 result = requests.put(url, data="test", headers=headers)1313 assert result.status_code == 403, (result, result.content)1314 assert b"SignatureDoesNotMatch" in result.content1315 @pytest.mark.aws_validated1316 def test_s3_get_response_default_content_type(self, s3_client, s3_bucket):1317 # When no content type is provided by a PUT request1318 # 'binary/octet-stream' should be used1319 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1320 # put object1321 object_key = "key-by-hostname"1322 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1323 # get object and assert headers1324 url = s3_client.generate_presigned_url(1325 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1326 )1327 response = requests.get(url, verify=False)1328 assert response.headers["content-type"] == "binary/octet-stream"1329 @pytest.mark.aws_validated1330 def test_s3_presigned_url_expired(self, s3_presigned_client, s3_bucket, monkeypatch):1331 if not is_aws_cloud():1332 monkeypatch.setattr(config, "S3_SKIP_SIGNATURE_VALIDATION", False)1333 object_key = "key-expires-in-2"1334 s3_presigned_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1335 # get object and assert headers1336 url = s3_presigned_client.generate_presigned_url(1337 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}, ExpiresIn=21338 )1339 # retrieving it before expiry1340 resp = requests.get(url, verify=False)1341 assert resp.status_code == 2001342 assert to_str(resp.content) == "something"1343 time.sleep(3) # wait for the URL to expire1344 resp = requests.get(url, verify=False)1345 resp_content = to_str(resp.content)1346 assert resp.status_code == 4031347 assert "<Code>AccessDenied</Code>" in resp_content1348 assert "<Message>Request has expired</Message>" in resp_content1349 url = s3_presigned_client.generate_presigned_url(1350 "get_object",1351 Params={"Bucket": s3_bucket, "Key": object_key},1352 ExpiresIn=120,1353 )1354 resp = requests.get(url, verify=False)1355 assert resp.status_code == 2001356 assert to_str(resp.content) == "something"1357 @pytest.mark.aws_validated1358 def test_s3_get_response_content_type_same_as_upload_and_range(self, s3_client, s3_bucket):1359 # put object1360 object_key = "foo/bar/key-by-hostname"1361 content_type = "foo/bar; charset=utf-8"1362 s3_client.put_object(1363 Bucket=s3_bucket,1364 Key=object_key,1365 Body="something " * 20,1366 ContentType=content_type,1367 )1368 url = s3_client.generate_presigned_url(1369 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1370 )1371 # get object and assert headers1372 response = requests.get(url, verify=False)1373 assert content_type == response.headers["content-type"]1374 # get object using range query and assert headers1375 response = requests.get(url, headers={"Range": "bytes=0-18"}, verify=False)1376 assert content_type == response.headers["content-type"]1377 # test we only get the first 18 bytes from the object1378 assert "something something" == to_str(response.content)1379 @pytest.mark.aws_validated1380 def test_s3_presigned_post_success_action_status_201_response(self, s3_client, s3_bucket):1381 # a security policy is required if the bucket is not publicly writable1382 # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html#RESTObjectPOST-requests-form-fields1383 body = "something body"1384 # get presigned URL1385 object_key = "key-${filename}"1386 presigned_request = s3_client.generate_presigned_post(1387 Bucket=s3_bucket,1388 Key=object_key,1389 Fields={"success_action_status": "201"},1390 Conditions=[{"bucket": s3_bucket}, ["eq", "$success_action_status", "201"]],1391 ExpiresIn=60,1392 )1393 files = {"file": ("my-file", body)}1394 response = requests.post(1395 presigned_request["url"],1396 data=presigned_request["fields"],1397 files=files,1398 verify=False,1399 )1400 # test1401 assert 201 == response.status_code1402 json_response = xmltodict.parse(response.content)1403 assert "PostResponse" in json_response1404 json_response = json_response["PostResponse"]1405 # fixme 201 response is hardcoded1406 # see localstack.services.s3.s3_listener.ProxyListenerS3.get_201_response1407 if is_aws_cloud():1408 location = f"{_bucket_url_vhost(s3_bucket, aws_stack.get_region())}/key-my-file"1409 etag = '"43281e21fce675ac3bcb3524b38ca4ed"' # TODO check quoting of etag1410 else:1411 location = "http://localhost/key-my-file"1412 etag = "d41d8cd98f00b204e9800998ecf8427f"1413 assert json_response["Location"] == location1414 assert json_response["Bucket"] == s3_bucket1415 assert json_response["Key"] == "key-my-file"1416 assert json_response["ETag"] == etag1417 @pytest.mark.aws_validated1418 @pytest.mark.xfail(reason="Access-Control-Allow-Origin returns Origin value in LS")1419 def test_s3_get_response_headers(self, s3_client, s3_bucket, snapshot):1420 # put object and CORS configuration1421 object_key = "key-by-hostname"1422 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1423 s3_client.put_bucket_cors(1424 Bucket=s3_bucket,1425 CORSConfiguration={1426 "CORSRules": [1427 {1428 "AllowedMethods": ["GET", "PUT", "POST"],1429 "AllowedOrigins": ["*"],1430 "ExposeHeaders": ["ETag", "x-amz-version-id"],1431 }1432 ]1433 },1434 )1435 bucket_cors_res = s3_client.get_bucket_cors(Bucket=s3_bucket)1436 snapshot.match("bucket-cors-response", bucket_cors_res)1437 # get object and assert headers1438 url = s3_client.generate_presigned_url(1439 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1440 )1441 # need to add Origin headers for S3 to send back the Access-Control-* headers1442 # as CORS is made for browsers1443 response = requests.get(url, verify=False, headers={"Origin": "http://localhost"})1444 assert response.headers["Access-Control-Expose-Headers"] == "ETag, x-amz-version-id"1445 assert response.headers["Access-Control-Allow-Methods"] == "GET, PUT, POST"1446 assert (1447 response.headers["Access-Control-Allow-Origin"] == "*"1448 ) # returns http://localhost in LS1449 @pytest.mark.aws_validated1450 @pytest.mark.xfail(reason="Behaviour diverges from AWS, Access-Control-* headers always added")1451 def test_s3_get_response_headers_without_origin(self, s3_client, s3_bucket):1452 # put object and CORS configuration1453 object_key = "key-by-hostname"1454 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1455 s3_client.put_bucket_cors(1456 Bucket=s3_bucket,1457 CORSConfiguration={1458 "CORSRules": [1459 {1460 "AllowedMethods": ["GET", "PUT", "POST"],1461 "AllowedOrigins": ["*"],1462 "ExposeHeaders": ["ETag", "x-amz-version-id"],1463 }1464 ]1465 },1466 )1467 # get object and assert headers1468 url = s3_client.generate_presigned_url(1469 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1470 )1471 response = requests.get(url, verify=False)1472 assert "Access-Control-Expose-Headers" not in response.headers1473 assert "Access-Control-Allow-Methods" not in response.headers1474 assert "Access-Control-Allow-Origin" not in response.headers1475 @pytest.mark.aws_validated1476 def test_presigned_url_with_session_token(self, s3_create_bucket_with_client, sts_client):1477 bucket_name = f"bucket-{short_uid()}"1478 key_name = "key"1479 response = sts_client.get_session_token()1480 client = boto3.client(1481 "s3",1482 config=Config(signature_version="s3v4"),1483 endpoint_url=None1484 if os.environ.get("TEST_TARGET") == "AWS_CLOUD"1485 else "http://127.0.0.1:4566",1486 aws_access_key_id=response["Credentials"]["AccessKeyId"],1487 aws_secret_access_key=response["Credentials"]["SecretAccessKey"],1488 aws_session_token=response["Credentials"]["SessionToken"],1489 )1490 s3_create_bucket_with_client(s3_client=client, Bucket=bucket_name)1491 client.put_object(Body="test-value", Bucket=bucket_name, Key=key_name)1492 presigned_url = client.generate_presigned_url(1493 ClientMethod="get_object",1494 Params={"Bucket": bucket_name, "Key": key_name},1495 ExpiresIn=600,1496 )1497 response = requests.get(presigned_url)1498 assert response._content == b"test-value"1499 @pytest.mark.aws_validated1500 def test_s3_get_response_header_overrides(self, s3_client, s3_bucket):1501 # Signed requests may include certain header overrides in the querystring1502 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1503 object_key = "key-header-overrides"1504 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful