How to use put_object_tagging method in localstack

Best Python code snippet using localstack_python

test_s3_tagging_policy_mixin_v.py

Source:test_s3_tagging_policy_mixin_v.py Github

copy

Full Screen

...13 resource = self.make_arn_resource("{}/{}".format(bucket_name, key))14 policy_document = make_json_policy("s3:GetObjectTagging", resource)15 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)16 input_tag_set = self.create_simple_tag_set(10)17 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)18 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)19 alt_client = get_alt_client(s3cfg_global_unique)20 response = alt_client.get_object_tagging(Bucket=bucket_name, Key=key)21 self.eq(response['TagSet'], input_tag_set['TagSet'])22 @pytest.mark.ess23 def test_put_tags_acl_public(self, s3cfg_global_unique):24 """25 测试-验证policy为PutObjectTagging时,进行设置对象tagging26 """27 key = 'testputtagsacl'28 client = get_client(s3cfg_global_unique)29 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)30 resource = self.make_arn_resource("{}/{}".format(bucket_name, key))31 policy_document = make_json_policy("s3:PutObjectTagging", resource)32 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)33 input_tag_set = self.create_simple_tag_set(10)34 alt_client = get_alt_client(s3cfg_global_unique)35 response = alt_client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)36 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)37 response = client.get_object_tagging(Bucket=bucket_name, Key=key)38 self.eq(response['TagSet'], input_tag_set['TagSet'])39 @pytest.mark.ess40 def test_delete_tags_obj_public(self, s3cfg_global_unique):41 """42 测试-验证policy为DeleteObjectTagging时,进行删除对象tagging操作43 """44 key = 'testputtagsacl'45 client = get_client(s3cfg_global_unique)46 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)47 resource = self.make_arn_resource("{}/{}".format(bucket_name, key))48 policy_document = make_json_policy("s3:DeleteObjectTagging",49 resource)50 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)51 input_tag_set = self.create_simple_tag_set(10)52 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)53 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)54 alt_client = get_alt_client(s3cfg_global_unique)55 response = alt_client.delete_object_tagging(Bucket=bucket_name, Key=key)56 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)57 response = client.get_object_tagging(Bucket=bucket_name, Key=key)58 self.eq(len(response['TagSet']), 0)59 @pytest.mark.ess60 def test_bucket_policy_get_obj_existing_tag(self, s3cfg_global_unique):61 """62 测试-验证ExistingObjectTag conditional on get object63 """64 client = get_client(s3cfg_global_unique)65 bucket_name = self.create_objects(s3cfg_global_unique, keys=['publictag', 'privatetag', 'invalidtag'])66 tag_conditional = {"StringEquals": {67 "s3:ExistingObjectTag/security": "public"68 }}69 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))70 policy_document = make_json_policy("s3:GetObject",71 resource,72 conditions=tag_conditional)73 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)74 tag_set = [{'Key': 'security', 'Value': 'public'}, {'Key': 'foo', 'Value': 'bar'}]75 input_tag_set = {'TagSet': tag_set}76 response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)77 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)78 tag_set2 = [{'Key': 'security', 'Value': 'private'}]79 input_tag_set = {'TagSet': tag_set2}80 response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tag_set)81 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)82 tag_set3 = [{'Key': 'security1', 'Value': 'public'}]83 input_tag_set = {'TagSet': tag_set3}84 response = client.put_object_tagging(Bucket=bucket_name, Key='invalidtag', Tagging=input_tag_set)85 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)86 alt_client = get_alt_client(s3cfg_global_unique)87 response = alt_client.get_object(Bucket=bucket_name, Key='publictag')88 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)89 e = assert_raises(ClientError, alt_client.get_object, Bucket=bucket_name, Key='privatetag')90 status, error_code = self.get_status_and_error_code(e.response)91 self.eq(status, 403)92 e = assert_raises(ClientError, alt_client.get_object, Bucket=bucket_name, Key='invalidtag')93 status, error_code = self.get_status_and_error_code(e.response)94 self.eq(status, 403)95 @pytest.mark.ess96 def test_bucket_policy_get_obj_tagging_existing_tag(self, s3cfg_global_unique):97 """98 测试-验证ExistingObjectTag conditional on get object tagging99 """100 client = get_client(s3cfg_global_unique)101 bucket_name = self.create_objects(s3cfg_global_unique, keys=['publictag', 'privatetag', 'invalidtag'])102 tag_conditional = {"StringEquals": {103 "s3:ExistingObjectTag/security": "public"104 }}105 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))106 policy_document = make_json_policy("s3:GetObjectTagging",107 resource,108 conditions=tag_conditional)109 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)110 tag_set = [{'Key': 'security', 'Value': 'public'}, {'Key': 'foo', 'Value': 'bar'}]111 input_tag_set = {'TagSet': tag_set}112 response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)113 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)114 tag_set2 = [{'Key': 'security', 'Value': 'private'}]115 input_tag_set = {'TagSet': tag_set2}116 response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tag_set)117 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)118 tag_set3 = [{'Key': 'security1', 'Value': 'public'}]119 input_tag_set = {'TagSet': tag_set3}120 response = client.put_object_tagging(Bucket=bucket_name, Key='invalidtag', Tagging=input_tag_set)121 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)122 alt_client = get_alt_client(s3cfg_global_unique)123 response = alt_client.get_object_tagging(Bucket=bucket_name, Key='publictag')124 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)125 # A get object itself should fail since we allowed only GetObjectTagging126 e = assert_raises(ClientError, alt_client.get_object, Bucket=bucket_name, Key='publictag')127 status, error_code = self.get_status_and_error_code(e.response)128 self.eq(status, 403)129 e = assert_raises(ClientError, alt_client.get_object_tagging, Bucket=bucket_name, Key='privatetag')130 status, error_code = self.get_status_and_error_code(e.response)131 self.eq(status, 403)132 e = assert_raises(ClientError, alt_client.get_object_tagging, Bucket=bucket_name, Key='invalidtag')133 status, error_code = self.get_status_and_error_code(e.response)134 self.eq(status, 403)135 @pytest.mark.ess136 def test_bucket_policy_put_obj_tagging_existing_tag(self, s3cfg_global_unique):137 """138 测试-验证ExistingObjectTag conditional on put object tagging139 """140 client = get_client(s3cfg_global_unique)141 bucket_name = self.create_objects(s3cfg_global_unique, keys=['publictag', 'privatetag', 'invalidtag'])142 tag_conditional = {"StringEquals": {143 "s3:ExistingObjectTag/security": "public"144 }}145 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))146 policy_document = make_json_policy("s3:PutObjectTagging",147 resource,148 conditions=tag_conditional)149 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)150 tag_set = [{'Key': 'security', 'Value': 'public'}, {'Key': 'foo', 'Value': 'bar'}]151 input_tag_set = {'TagSet': tag_set}152 response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)153 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)154 tag_set2 = [{'Key': 'security', 'Value': 'private'}]155 input_tag_set = {'TagSet': tag_set2}156 response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tag_set)157 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)158 alt_client = get_alt_client(s3cfg_global_unique)159 # PUT requests with object tagging are a bit wierd, if you forget to put160 # the tag which is supposed to be existing anymore well, well subsequent161 # put requests will fail162 test_tag_set1 = [{'Key': 'security', 'Value': 'public'}, {'Key': 'foo', 'Value': 'bar'}]163 input_tag_set = {'TagSet': test_tag_set1}164 response = alt_client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)165 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)166 e = assert_raises(ClientError, alt_client.put_object_tagging, Bucket=bucket_name, Key='privatetag',167 Tagging=input_tag_set)168 status, error_code = self.get_status_and_error_code(e.response)169 self.eq(status, 403)170 test_tag_set2 = [{'Key': 'security', 'Value': 'private'}]171 input_tag_set = {'TagSet': test_tag_set2}172 response = alt_client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)173 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)174 # Now try putting the original tags again, this should fail175 input_tag_set = {'TagSet': test_tag_set1}176 e = assert_raises(ClientError, alt_client.put_object_tagging, Bucket=bucket_name, Key='publictag',177 Tagging=input_tag_set)178 status, error_code = self.get_status_and_error_code(e.response)179 self.eq(status, 403)180 @pytest.mark.ess181 def test_bucket_policy_put_obj_copy_source(self, s3cfg_global_unique):182 """183 测试-验证copy-source conditional on put obj184 """185 client = get_client(s3cfg_global_unique)186 bucket_name = self.create_objects(s3cfg_global_unique, keys=['public/foo', 'public/bar', 'private/foo'])187 src_resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))188 policy_document = make_json_policy("s3:GetObject",189 src_resource)190 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)191 bucket_name2 = self.get_new_bucket(client, s3cfg_global_unique)192 tag_conditional = {"StringLike": {193 "s3:x-amz-copy-source": bucket_name + "/public/*"194 }}195 resource = self.make_arn_resource("{}/{}".format(bucket_name2, "*"))196 policy_document = make_json_policy("s3:PutObject",197 resource,198 conditions=tag_conditional)199 client.put_bucket_policy(Bucket=bucket_name2, Policy=policy_document)200 alt_client = get_alt_client(s3cfg_global_unique)201 copy_source = {'Bucket': bucket_name, 'Key': 'public/foo'}202 alt_client.copy_object(Bucket=bucket_name2, CopySource=copy_source, Key='new_foo')203 # This is possible because we are still the owner, see the grants with204 # policy on how to do this right205 response = alt_client.get_object(Bucket=bucket_name2, Key='new_foo')206 body = self.get_body(response)207 self.eq(body, 'public/foo')208 copy_source = {'Bucket': bucket_name, 'Key': 'public/bar'}209 alt_client.copy_object(Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2')210 response = alt_client.get_object(Bucket=bucket_name2, Key='new_foo2')211 body = self.get_body(response)212 self.eq(body, 'public/bar')213 copy_source = {'Bucket': bucket_name, 'Key': 'private/foo'}214 self.check_access_denied(alt_client.copy_object, Bucket=bucket_name2, CopySource=copy_source, Key='new_foo2')215 @pytest.mark.ess216 def test_bucket_policy_put_obj_copy_source_meta(self, s3cfg_global_unique):217 """218 测试-验证copy-source conditional on put obj219 """220 client = get_client(s3cfg_global_unique)221 src_bucket_name = self.create_objects(s3cfg_global_unique, keys=['public/foo', 'public/bar'])222 src_resource = self.make_arn_resource("{}/{}".format(src_bucket_name, "*"))223 policy_document = make_json_policy("s3:GetObject",224 src_resource)225 client.put_bucket_policy(Bucket=src_bucket_name, Policy=policy_document)226 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)227 tag_conditional = {"StringEquals": {228 "s3:x-amz-metadata-directive": "COPY"229 }}230 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))231 policy_document = make_json_policy("s3:PutObject",232 resource,233 conditions=tag_conditional)234 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)235 alt_client = get_alt_client(s3cfg_global_unique)236 lf = (lambda **kwargs: kwargs['params']['headers'].update({"x-amz-metadata-directive": "COPY"}))237 alt_client.meta.events.register('before-call.s3.CopyObject', lf)238 copy_source = {'Bucket': src_bucket_name, 'Key': 'public/foo'}239 alt_client.copy_object(Bucket=bucket_name, CopySource=copy_source, Key='new_foo')240 # This is possible because we are still the owner, see the grants with241 # policy on how to do this right242 response = alt_client.get_object(Bucket=bucket_name, Key='new_foo')243 body = self.get_body(response)244 self.eq(body, 'public/foo')245 # remove the x-amz-metadata-directive header246 def remove_header(**kwargs):247 if "x-amz-metadata-directive" in kwargs['params']['headers']:248 del kwargs['params']['headers']["x-amz-metadata-directive"]249 alt_client.meta.events.register('before-call.s3.CopyObject', remove_header)250 copy_source = {'Bucket': src_bucket_name, 'Key': 'public/bar'}251 self.check_access_denied(alt_client.copy_object, Bucket=bucket_name, CopySource=copy_source, Key='new_foo2',252 Metadata={"foo": "bar"})253 @pytest.mark.ess254 def test_bucket_policy_put_obj_acl(self, s3cfg_global_unique):255 """256 测试-验证put obj with canned-acl not to be public257 """258 client = get_client(s3cfg_global_unique)259 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)260 # An allow conditional will require atleast the presence of an x-amz-acl261 # attribute a Deny conditional would negate any requests that try to set a262 # public-read/write acl263 conditional = {"StringLike": {264 "s3:x-amz-acl": "public*"265 }}266 p = Policy()267 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))268 s1 = Statement("s3:PutObject", resource)269 s2 = Statement("s3:PutObject", resource, effect="Deny", condition=conditional)270 policy_document = p.add_statement(s1).add_statement(s2).to_json()271 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)272 alt_client = get_alt_client(s3cfg_global_unique)273 key1 = 'private-key'274 # if we want to be really pedantic, we should check that this doesn't raise275 # and mark a failure, however if this does raise nosetests would mark this276 # as an ERROR anyway277 response = alt_client.put_object(Bucket=bucket_name, Key=key1, Body=key1)278 # response = alt_client.put_object_acl(Bucket=bucket_name, Key=key1, ACL='private')279 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)280 key2 = 'public-key'281 lf = (lambda **kwargs: kwargs['params']['headers'].update({"x-amz-acl": "public-read"}))282 alt_client.meta.events.register('before-call.s3.PutObject', lf)283 e = assert_raises(ClientError, alt_client.put_object, Bucket=bucket_name, Key=key2, Body=key2)284 status, error_code = self.get_status_and_error_code(e.response)285 self.eq(status, 403)286 @pytest.mark.ess287 @pytest.mark.fails_on_ess # TODO: remove this fails_on_rgw when I fix it288 @pytest.mark.xfail(reason="预期:上传对象的时候,可以在headers里设置x-amz-tagging参数", run=True, strict=True)289 def test_bucket_policy_put_obj_request_obj_tag(self, s3cfg_global_unique):290 """291 测试-验证put obj with RequestObjectTag292 """293 client = get_client(s3cfg_global_unique)294 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)295 tag_conditional = {"StringEquals": {296 "s3:RequestObjectTag/security": "public"297 }}298 p = Policy()299 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))300 s1 = Statement("s3:PutObject", resource, effect="Allow", condition=tag_conditional)301 policy_document = p.add_statement(s1).to_json()302 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)303 alt_client = get_alt_client(s3cfg_global_unique)304 key1_str = 'testobj'305 self.check_access_denied(alt_client.put_object, Bucket=bucket_name, Key=key1_str, Body=key1_str)306 headers = {"x-amz-tagging": "security=public"}307 lf = (lambda **kwargs: kwargs['params']['headers'].update(headers))308 client.meta.events.register('before-call.s3.PutObject', lf)309 # TODO: why is this a 400 and not passing310 alt_client.put_object(Bucket=bucket_name, Key=key1_str, Body=key1_str)311 @pytest.mark.ess312 def test_bucket_policy_get_obj_acl_existing_tag(self, s3cfg_global_unique):313 """314 测试-验证ExistingObjectTag conditional on get object acl315 """316 client = get_client(s3cfg_global_unique)317 bucket_name = self.create_objects(s3cfg_global_unique, keys=['publictag', 'privatetag', 'invalidtag'])318 tag_conditional = {"StringEquals": {319 "s3:ExistingObjectTag/security": "public"320 }}321 resource = self.make_arn_resource("{}/{}".format(bucket_name, "*"))322 policy_document = make_json_policy("s3:GetObjectAcl",323 resource,324 conditions=tag_conditional)325 client.put_bucket_policy(Bucket=bucket_name, Policy=policy_document)326 tag_set = [{'Key': 'security', 'Value': 'public'}, {'Key': 'foo', 'Value': 'bar'}]327 input_tag_set = {'TagSet': tag_set}328 response = client.put_object_tagging(Bucket=bucket_name, Key='publictag', Tagging=input_tag_set)329 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)330 tag_set2 = [{'Key': 'security', 'Value': 'private'}]331 input_tag_set = {'TagSet': tag_set2}332 response = client.put_object_tagging(Bucket=bucket_name, Key='privatetag', Tagging=input_tag_set)333 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)334 tag_set3 = [{'Key': 'security1', 'Value': 'public'}]335 input_tag_set = {'TagSet': tag_set3}336 response = client.put_object_tagging(Bucket=bucket_name, Key='invalidtag', Tagging=input_tag_set)337 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)338 alt_client = get_alt_client(s3cfg_global_unique)339 response = alt_client.get_object_acl(Bucket=bucket_name, Key='publictag')340 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)341 # A get object itself should fail since we allowed only GetObjectTagging342 e = assert_raises(ClientError, alt_client.get_object, Bucket=bucket_name, Key='publictag')343 status, error_code = self.get_status_and_error_code(e.response)344 self.eq(status, 403)345 e = assert_raises(ClientError, alt_client.get_object_tagging, Bucket=bucket_name, Key='privatetag')346 status, error_code = self.get_status_and_error_code(e.response)347 self.eq(status, 403)348 e = assert_raises(ClientError, alt_client.get_object_tagging, Bucket=bucket_name, Key='invalidtag')349 status, error_code = self.get_status_and_error_code(e.response)350 self.eq(status, 403)

Full Screen

Full Screen

test_put_object_tagging.py

Source:test_put_object_tagging.py Github

copy

Full Screen

1from typing import Any, Dict2import pytest3from pytest_mock import MockerFixture4from functions.put_object_tagging import app5@pytest.fixture6def event() -> Dict[str, Any]:7 return {8 "md5_hash": "myhash",9 "bucket": "foo",10 "key": "bar.baz",11 }12def test_put_object_tagging_existing_tags(13 mocker: MockerFixture, event: Dict[str, Any]14) -> None:15 stub_client = mocker.MagicMock()16 stub_client_instance = mocker.MagicMock()17 stub_client.return_value = stub_client_instance18 stub_client_instance.get_object_tagging.return_value = {19 "TagSet": [{"Key": "existingtag", "Value": "alreadythere"}]20 }21 mocker.patch("functions.put_object_tagging.app.TaggingClient", stub_client)22 context = mocker.Mock()23 app.lambda_handler(event, context)24 stub_client_instance.put_object_tagging.assert_called_once_with(25 bucket="foo",26 key="bar.baz",27 tagging={28 "TagSet": [29 {"Key": "existingtag", "Value": "alreadythere"},30 {"Key": "md5sum", "Value": "myhash"},31 ]32 },33 )34def test_put_object_tagging_no_existing_tags(35 mocker: MockerFixture, event: Dict[str, Any]36) -> None:37 stub_client = mocker.MagicMock()38 stub_client_instance = mocker.MagicMock()39 stub_client.return_value = stub_client_instance40 stub_client_instance.get_object_tagging.return_value = {"TagSet": []}41 mocker.patch("functions.put_object_tagging.app.TaggingClient", stub_client)42 context = mocker.Mock()43 app.lambda_handler(event, context)44 stub_client_instance.put_object_tagging.assert_called_once_with(45 bucket="foo",46 key="bar.baz",47 tagging={48 "TagSet": [49 {"Key": "md5sum", "Value": "myhash"},50 ]51 },...

Full Screen

Full Screen

add_tags.py

Source:add_tags.py Github

copy

Full Screen

1Option 1 : Using boto3 API "put_object_tagging" method .[1]2 Example :3 The following example adds tags to an existing object.4 response = client.put_object_tagging(5 Bucket='examplebucket',6 Key='HappyFace.jpg',7 Tagging={8 'TagSet': [9 {10 'Key': 'Key3',11 'Value': 'Value3',12 },13 {14 'Key': 'Key4',15 'Value': 'Value4',16 },17 ],18 },...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful