Best Python code snippet using localstack_python
test_s3_tagging_v.py
Source:test_s3_tagging_v.py  
...23        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)24        input_tag_set = self.create_simple_tag_set(2)25        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)26        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)27        response = client.get_object_tagging(Bucket=bucket_name, Key=key)28        self.eq(response['TagSet'], input_tag_set['TagSet'])29    def test_get_obj_head_tagging(self, s3cfg_global_unique):30        """31        æµè¯-éªè¯head-objectéå«æè®¾ç½®çtag32        """33        key = 'testputtags'34        client = get_client(s3cfg_global_unique)35        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)36        count = 237        input_tag_set = self.create_simple_tag_set(count)38        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)39        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)40        response = client.head_object(Bucket=bucket_name, Key=key)41        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)42        self.eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-tagging-count'], str(count))43    def test_put_max_tags(self, s3cfg_global_unique):44        """45        æµè¯-éªè¯æå¤§å
许设置çtagsï¼10个ï¼46        """47        key = 'testputmaxtags'48        client = get_client(s3cfg_global_unique)49        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)50        input_tag_set = self.create_simple_tag_set(10)51        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)52        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)53        response = client.get_object_tagging(Bucket=bucket_name, Key=key)54        self.eq(response['TagSet'], input_tag_set['TagSet'])55    def test_put_excess_tags(self, s3cfg_global_unique):56        """57        æµè¯-éªè¯æå¤§å
许设置çtagsï¼11个ï¼ï¼ failed58        """59        key = 'testputmaxtags'60        client = get_client(s3cfg_global_unique)61        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)62        input_tag_set = self.create_simple_tag_set(11)63        e = assert_raises(64            ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)65        status, error_code = self.get_status_and_error_code(e.response)66        self.eq(status, 400)67        self.eq(error_code, 'InvalidTag')68        response = client.get_object_tagging(Bucket=bucket_name, Key=key)69        self.eq(len(response['TagSet']), 0)70    def test_put_max_kvsize_tags(self, s3cfg_global_unique):71        """72        æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæå¤§å符æ°ï¼key为128ï¼value为25673        """74        key = 'testputmaxkeysize'75        client = get_client(s3cfg_global_unique)76        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)77        tag_set = []78        for i in range(10):79            k = self.make_random_string(128)80            v = self.make_random_string(256)81            tag_set.append({'Key': k, 'Value': v})82        input_tag_set = {'TagSet': tag_set}83        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)84        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)85        response = client.get_object_tagging(Bucket=bucket_name, Key=key)86        for kv_pair in response['TagSet']:87            self.eq((kv_pair in input_tag_set['TagSet']), True)88    def test_put_excess_key_tags(self, s3cfg_global_unique):89        """90        æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæå¤§å符æ°ï¼key为129ï¼value为256ï¼failed91        """92        key = 'testputexcesskeytags'93        client = get_client(s3cfg_global_unique)94        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)95        tag_set = []96        for i in range(10):97            k = self.make_random_string(129)98            v = self.make_random_string(256)99            tag_set.append({'Key': k, 'Value': v})100        input_tag_set = {'TagSet': tag_set}101        e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)102        status, error_code = self.get_status_and_error_code(e.response)103        self.eq(status, 400)104        self.eq(error_code, 'InvalidTag')105        response = client.get_object_tagging(Bucket=bucket_name, Key=key)106        self.eq(len(response['TagSet']), 0)107    def test_put_excess_val_tags(self, s3cfg_global_unique):108        """109        æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæå¤§å符æ°ï¼key为128ï¼value为257ï¼failed110        """111        key = 'testputexcesskeytags'112        client = get_client(s3cfg_global_unique)113        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)114        tag_set = []115        for i in range(10):116            k = self.make_random_string(128)117            v = self.make_random_string(257)118            tag_set.append({'Key': k, 'Value': v})119        input_tag_set = {'TagSet': tag_set}120        e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)121        status, error_code = self.get_status_and_error_code(e.response)122        self.eq(status, 400)123        self.eq(error_code, 'InvalidTag')124        response = client.get_object_tagging(Bucket=bucket_name, Key=key)125        self.eq(len(response['TagSet']), 0)126    def test_put_modify_tags(self, s3cfg_global_unique):127        """128        æµè¯-éªè¯ä¿®æ¹å·²åå¨çtags129        """130        key = 'testputmodifytags'131        client = get_client(s3cfg_global_unique)132        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)133        tag_set = [{'Key': 'key', 'Value': 'val'}, {'Key': 'key2', 'Value': 'val2'}]134        input_tag_set = {'TagSet': tag_set}135        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)136        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)137        response = client.get_object_tagging(Bucket=bucket_name, Key=key)138        self.eq(response['TagSet'], input_tag_set['TagSet'])139        tag_set2 = [{'Key': 'key3', 'Value': 'val3'}]140        input_tag_set2 = {'TagSet': tag_set2}141        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set2)142        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)143        response = client.get_object_tagging(Bucket=bucket_name, Key=key)144        self.eq(response['TagSet'], input_tag_set2['TagSet'])145    def test_put_delete_tags(self, s3cfg_global_unique):146        """147        æµè¯-éªè¯å é¤tags148        """149        key = 'testputmodifytags'150        client = get_client(s3cfg_global_unique)151        bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)152        input_tag_set = self.create_simple_tag_set(2)153        response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)154        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)155        response = client.get_object_tagging(Bucket=bucket_name, Key=key)156        self.eq(response['TagSet'], input_tag_set['TagSet'])157        response = client.delete_object_tagging(Bucket=bucket_name, Key=key)158        self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)159        response = client.get_object_tagging(Bucket=bucket_name, Key=key)160        self.eq(len(response['TagSet']), 0)161    def test_post_object_tags_anonymous_request(self, s3cfg_global_unique):162        """163        æµè¯-éªè¯è®¾ç½®å¯¹è±¡tagsï¼éè¿browser based via POST request164        """165        client = get_client(s3cfg_global_unique)166        bucket_name = self.get_new_bucket_name(s3cfg_global_unique)167        url = self.get_post_url(s3cfg_global_unique, bucket_name)168        client.create_bucket(ACL='public-read-write', Bucket=bucket_name)169        key_name = "foo.txt"170        input_tag_set = self.create_simple_tag_set(2)171        # xml_input_tag_set is the same as input_tag_set in xml.172        # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss173        xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"174        payload = OrderedDict([175            ("key", key_name),176            ("acl", "public-read"),177            ("Content-Type", "text/plain"),178            ("tagging", xml_input_tag_set),179            ('file', 'bar'),180        ])181        r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)182        self.eq(r.status_code, 204)183        response = client.get_object(Bucket=bucket_name, Key=key_name)184        body = self.get_body(response)185        self.eq(body, 'bar')186        response = client.get_object_tagging(Bucket=bucket_name, Key=key_name)187        self.eq(response['TagSet'], input_tag_set['TagSet'])188    def test_post_object_tags_authenticated_request(self, s3cfg_global_unique):189        """190        æµè¯-éªè¯191        (operation='authenticated browser based upload via POST request')192        (assertion='succeeds and returns written data')193        """194        client = get_client(s3cfg_global_unique)195        bucket_name = self.get_new_bucket(client, s3cfg_global_unique)196        url = self.get_post_url(s3cfg_global_unique, bucket_name)197        utc = pytz.utc198        expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)199        policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),200                           "conditions": [201                               {"bucket": bucket_name},202                               ["starts-with", "$key", "foo"],203                               {"acl": "private"},204                               ["starts-with", "$Content-Type", "text/plain"],205                               ["content-length-range", 0, 1024],206                               ["starts-with", "$tagging", ""]207                           ]}208        # xml_input_tag_set is the same as `input_tag_set = self.create_simple_tag_set(2)` in xml209        # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss210        xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"211        json_policy_document = json.JSONEncoder().encode(policy_document)212        bytes_json_policy_document = bytes(json_policy_document, 'utf-8')213        policy = base64.b64encode(bytes_json_policy_document)214        aws_secret_access_key = s3cfg_global_unique.main_secret_key215        aws_access_key_id = s3cfg_global_unique.main_access_key216        signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())217        payload = OrderedDict([218            ("key", "foo.txt"),219            ("AWSAccessKeyId", aws_access_key_id),220            ("acl", "private"), ("signature", signature), ("policy", policy),221            ("tagging", xml_input_tag_set),222            ("Content-Type", "text/plain"),223            ('file', 'bar')])224        r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)225        self.eq(r.status_code, 204)226        response = client.get_object(Bucket=bucket_name, Key='foo.txt')227        body = self.get_body(response)228        self.eq(body, 'bar')229    def test_put_obj_with_tags(self, s3cfg_global_unique):230        """231        (operation='Test PutObj with tagging headers')232        (assertion='success')233        """234        client = get_client(s3cfg_global_unique)235        bucket_name = self.get_new_bucket(client, s3cfg_global_unique)236        key = 'testtagobj1'237        data = 'A' * 100238        tag_set = [{'Key': 'bar', 'Value': ''}, {'Key': 'foo', 'Value': 'bar'}]239        put_obj_tag_headers = {240            'x-amz-tagging': 'foo=bar&bar'241        }242        lf = (lambda **kwargs: kwargs['params']['headers'].update(put_obj_tag_headers))243        client.meta.events.register('before-call.s3.PutObject', lf)244        client.put_object(Bucket=bucket_name, Key=key, Body=data)245        response = client.get_object(Bucket=bucket_name, Key=key)246        body = self.get_body(response)247        self.eq(body, data)248        response = client.get_object_tagging(Bucket=bucket_name, Key=key)249        response_tag_set = response['TagSet']250        tag_set = tag_set251        self.eq(response_tag_set, tag_set)252class TestBucketTagging(TestTaggingBase):253    @pytest.mark.ess254    @pytest.mark.fails_on_ess255    @pytest.mark.xfail(reason="颿ï¼å½æ²¡è®¾ç½®æ¡¶æ ç¾çæ¶åï¼è·åæ ç¾è¿åNoSuchTagSetError", run=True, strict=True)256    def test_set_bucket_tagging(self, s3cfg_global_unique):257        """258        æµè¯-éªè¯è®¾ç½®å卿¡¶çtags259        """260        client = get_client(s3cfg_global_unique)261        bucket_name = self.get_new_bucket(client, s3cfg_global_unique)262        e = assert_raises(ClientError, client.get_bucket_tagging, Bucket=bucket_name)  # won't raise ClientError...test_s3.py
Source:test_s3.py  
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5#      http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12from botocore import exceptions as boto_exc13import mock14import testtools15from s3uploader import exceptions16from s3uploader import s317class TestAssetManager(testtools.TestCase):18    def setUp(self):19        super(TestAssetManager, self).setUp()20        self.storage_mock = mock.Mock()21        self.manager = s3.AssetManager(storage_manager=self.storage_mock)22        self.addCleanup(mock.patch.stopall)23    def test_create_asset(self):24        self.storage_mock.get_url_for_upload.return_value = 'foo_url'25        asset = self.manager.create_asset()26        self.assertIsNotNone(asset)27        self.assertEqual('foo_url', asset.url)28    def test_update_asset(self):29        self.manager.update_asset('foo_asset_id')30        self.storage_mock.update_upload_status.assert_called_once_with(31            'foo_asset_id')32    def test_get_asset(self):33        self.storage_mock.get_url_for_download.return_value = 'foo_url'34        asset = self.manager.get_asset('foo_asset_id', 50)35        self.assertIsNotNone(asset)36        self.assertEqual('foo_url', asset.url)37class TestS3Manager(testtools.TestCase):38    def setUp(self):39        super(TestS3Manager, self).setUp()40        self.manager = s3.S3Manager(config=mock.Mock(), client=mock.Mock())41        self.addCleanup(mock.patch.stopall)42    def test_get_url_for_upload(self):43        self.manager.get_url_for_upload('foo_asset_id')44        self.manager.client.generate_presigned_post.assert_called_once()45    def test_update_upload_status(self):46        self.manager.update_upload_status('foo_asset_id')47        self.manager.client.put_object_tagging.assert_called_once()48    def test_update_upload_status_raise_not_found(self):49        exc = boto_exc.ClientError(50            error_response={}, operation_name='foo')51        exc.response['Error'] = {'Code': 'NoSuchKey'}52        self.manager.client.put_object_tagging.side_effect = exc53        self.assertRaises(54            exceptions.AssetNotFoundError,55            self.manager.update_upload_status, 'foo_asset_id')56    def test_get_url_for_download_ready_for_download(self):57        self.manager.client.get_object_tagging.return_value = (58            {'TagSet': [{'Key': 'Status', 'Value': 'Uploaded'}]})59        self.manager.get_url_for_download('foo_asset_id', 50)60        self.manager.client.get_object_tagging.assert_called_once()61        self.manager.client.generate_presigned_url.assert_called_once()62    def test_get_url_for_download_not_ready_for_download(self):63        self.manager.client.get_object_tagging.return_value = None64        self.manager.get_url_for_download('foo_asset_id', 50)65        self.manager.client.get_object_tagging.assert_called_once()66        self.assertFalse(self.manager.client.generate_presigned_url.call_count)67    def test_get_url_for_download_not_found(self):68        exc = boto_exc.ClientError(69            error_response={}, operation_name='foo')70        exc.response['Error'] = {'Code': 'NoSuchKey'}71        self.manager.client.get_object_tagging.side_effect = exc72        self.assertRaises(73            exceptions.AssetNotFoundError,74            self.manager.get_url_for_download, 'foo_asset_id', 50)75    def test_get_url_for_download_unknown_error(self):76        exc = boto_exc.ClientError(77            error_response={}, operation_name='foo')78        exc.response['Error'] = {'Code': 'UncaughtError'}79        self.manager.client.get_object_tagging.side_effect = exc80        self.assertRaises(81            exceptions.AssetError,82            self.manager.get_url_for_download, 'foo_asset_id', 50)83    def test_get_url_for_upload_raises(self):84        self.manager.client.generate_presigned_post.side_effect = (85            boto_exc.BotoCoreError)86        self.assertRaises(exceptions.AssetError,87                          self.manager.get_url_for_upload, 'foo_asset_id')88    def test_update_upload_status_raises(self):89        self.manager.client.put_object_tagging.side_effect = (90            boto_exc.ClientError(error_response={}, operation_name='foo'))91        self.assertRaises(exceptions.AssetError,...boto3_tagging.py
Source:boto3_tagging.py  
...17                         Bucket='s3-obj-tagging-test',18                         Key='put-obj-test',19                         Tagging='two=2&Project=X'20                         )21# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',22#                                      Key='test-s3-file',23#                                      VersionId='eVQkyDiNOUF.ipmMi7Tawtk5GQslpHtK')24# print (response)25# response = client.delete_object_tagging(Bucket='s3-obj-tagging-test',26#                                         Key='test-s3-file',27#                                         VersionId='jk.ArjftY0Htz7weCNXNhX34YmSzJo73')28# print response29# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',30#                                      Key='test-s3-file',31#                                      Tagging={32#                                         'TagSet': [33#                                             { 'Key': 'Owner1', 'Value': '1'}34#                                         ]35#                                      })36# print response37# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',38#                                      Key='test-s3-file')39# print (response)40# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',41#                                      Key='test',42#                                      Tagging={43#                                         'TagSet': [44#                                             { 'Key': 'first1', 'Value': '1'},45#                                             { 'Key': 'second2', 'Value': '2'}46#                                         ]47#                                      })48# print response49#50# response = client.get_object_tagging(Bucket='s3-obj-tagging-test', Key='test')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
