How to use complete_multipart_upload method in localstack

Best Python code snippet using localstack_python

test_s3_multipart_upload_v.py

Source:test_s3_multipart_upload_v.py Github

copy

Full Screen

...31 config=config,32 bucket_name=bucket_name, key=key, size=obj_len,33 content_type=content_type, metadata=metadata,34 resend_parts=resend_parts)35 client.complete_multipart_upload(36 Bucket=bucket_name, Key=key, UploadId=upload_id, MultipartUpload={'Parts': parts})37 response = client.get_object(Bucket=bucket_name, Key=key)38 self.eq(response['ContentType'], content_type)39 self.eq(response['Metadata'], metadata)40 body = self.get_body(response)41 self.eq(len(body), response['ContentLength'])42 self.eq(body, data)43 self.check_content_using_range(client, key, bucket_name, data, 1000000)44 self.check_content_using_range(client, key, bucket_name, data, 10000000)45class TestObjectMultipartUpload(TestMultipartBase):46 @pytest.mark.ess47 def test_multipart_upload_empty(self, s3cfg_global_unique):48 """49 测试-验证合并分段上传任务的时候,不提供Parts,50 400,MalformedXML51 """52 client = get_client(s3cfg_global_unique)53 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)54 key1 = "mymultipart"55 obj_len = 056 upload_id, data, parts = self.multipart_upload(57 s3cfg_global_unique, bucket_name=bucket_name, key=key1, size=obj_len)58 e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket_name, Key=key1,59 UploadId=upload_id)60 status, error_code = self.get_status_and_error_code(e.response)61 self.eq(status, 400)62 self.eq(error_code, 'MalformedXML')63 @pytest.mark.ess64 def test_multipart_upload_small(self, s3cfg_global_unique):65 """66 测试-验证上传一片,且分段对象大小是167 """68 client = get_client(s3cfg_global_unique)69 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)70 key1 = "mymultipart"71 obj_len = 172 (upload_id, data, parts) = self.multipart_upload(73 s3cfg_global_unique, bucket_name=bucket_name, key=key1, size=obj_len)74 client.complete_multipart_upload(75 Bucket=bucket_name, Key=key1, UploadId=upload_id, MultipartUpload={'Parts': parts})76 response = client.get_object(Bucket=bucket_name, Key=key1)77 self.eq(response['ContentLength'], obj_len)78 @pytest.mark.ess79 def test_multipart_copy_small(self, s3cfg_global_unique):80 """81 测试-验证upload_part_copy接口拷贝小的分片对象82 """83 client = get_client(s3cfg_global_unique)84 src_key = 'foo'85 src_bucket_name = self.create_key_with_random_content(s3cfg_global_unique, src_key)86 dest_bucket_name = self.get_new_bucket(client, s3cfg_global_unique)87 dest_key = "mymultipart"88 size = 189 upload_id, parts = self.multipart_copy(90 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)91 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,92 MultipartUpload={'Parts': parts})93 response = client.get_object(Bucket=dest_bucket_name, Key=dest_key)94 self.eq(size, response['ContentLength'])95 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)96 @pytest.mark.ess97 def test_multipart_copy_invalid_range(self, s3cfg_global_unique):98 """99 测试-验证分段上传拷贝接口,使用无效的range,查看是否符合预期100 """101 client = get_client(s3cfg_global_unique)102 src_key = 'source'103 src_bucket_name = self.create_key_with_random_content(s3cfg_global_unique, src_key, size=5)104 response = client.create_multipart_upload(Bucket=src_bucket_name, Key='dest')105 upload_id = response['UploadId']106 copy_source = {'Bucket': src_bucket_name, 'Key': src_key}107 copy_source_range = 'bytes={start}-{end}'.format(start=0, end=21)108 e = assert_raises(109 ClientError, client.upload_part_copy,110 Bucket=src_bucket_name, Key='dest',111 UploadId=upload_id, CopySource=copy_source,112 CopySourceRange=copy_source_range, PartNumber=1)113 status, error_code = self.get_status_and_error_code(e.response)114 valid_status = [400, 416]115 if status not in valid_status:116 raise AssertionError("Invalid response " + str(status))117 self.eq(error_code, 'InvalidRange')118 @pytest.mark.ess119 @pytest.mark.fails_on_ess120 @pytest.mark.xfail(reason="预期:无效的CopySourceRange取值应该返回错误响应", run=True, strict=True)121 def test_multipart_copy_improper_range(self, s3cfg_global_unique):122 """123 测试-验证CopySourceRange参数的不同取值(异常取值)下的响应,124 """125 # TODO: remove fails_on_rgw when https://tracker.ceph.com/issues/40795 is resolved126 client = get_client(s3cfg_global_unique)127 src_key = 'source'128 src_bucket_name = self.create_key_with_random_content(s3cfg_global_unique, src_key, size=5)129 response = client.create_multipart_upload(130 Bucket=src_bucket_name, Key='dest')131 upload_id = response['UploadId']132 copy_source = {'Bucket': src_bucket_name, 'Key': src_key}133 test_ranges = [134 '{start}-{end}'.format(start=0, end=2),135 'bytes={start}'.format(start=0),136 'bytes=hello-world', # succeed, so strange.137 'bytes=0-bar', # succeed, so strange.138 'bytes=hello-', # succeed, so strange.139 'bytes=0-2,3-5' # succeed, so strange.140 ]141 """142 CopySourceRange:143 The range of bytes to copy from the source object. 144 The range value must use the form bytes=first-last, 145 where the first and last are the zero-based byte offsets to copy. 146 For example, bytes=0-9 indicates that you want to copy the first 10 bytes of the source. 147 You can copy a range only if the source object is greater than 5 MB.148 """149 for test_range in test_ranges:150 e = assert_raises(ClientError, client.upload_part_copy,151 Bucket=src_bucket_name, Key='dest',152 UploadId=upload_id,153 CopySource=copy_source,154 CopySourceRange=test_range,155 PartNumber=1)156 status, error_code = self.get_status_and_error_code(e.response)157 self.eq(status, 400)158 self.eq(error_code, 'InvalidArgument')159 @pytest.mark.ess160 def test_multipart_copy_without_range(self, s3cfg_global_unique):161 """162 测试-验证check multipart copies without x-amz-copy-source-range163 """164 client = get_client(s3cfg_global_unique)165 src_key = 'source'166 src_bucket_name = self.create_key_with_random_content(s3cfg_global_unique, src_key, size=10)167 dest_bucket_name = self.get_new_bucket_name(s3cfg_global_unique)168 self.get_new_bucket(client, s3cfg_global_unique, name=dest_bucket_name)169 dest_key = "mymultipartcopy"170 response = client.create_multipart_upload(Bucket=dest_bucket_name, Key=dest_key)171 upload_id = response['UploadId']172 parts = []173 copy_source = {'Bucket': src_bucket_name, 'Key': src_key}174 part_num = 1175 response = client.upload_part_copy(Bucket=dest_bucket_name, Key=dest_key, CopySource=copy_source,176 PartNumber=part_num, UploadId=upload_id)177 parts.append({'ETag': response['CopyPartResult']['ETag'], 'PartNumber': part_num})178 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,179 MultipartUpload={'Parts': parts})180 response = client.get_object(Bucket=dest_bucket_name, Key=dest_key)181 self.eq(response['ContentLength'], 10)182 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)183 @pytest.mark.ess184 def test_multipart_copy_special_names(self, s3cfg_global_unique):185 """186 测试-验证复制分段上传接口,单个小片(size=10 bytes)187 """188 client = get_client(s3cfg_global_unique)189 src_bucket_name = self.get_new_bucket(client, s3cfg_global_unique)190 dest_bucket_name = self.get_new_bucket(client, s3cfg_global_unique)191 dest_key = "mymultipart"192 size = 1193 for src_key in (' ', '_', '__', '?versionId'):194 self.create_key_with_random_content(195 s3cfg_global_unique, src_key, bucket_name=src_bucket_name, size=10) # add size=10 to save time.196 (upload_id, parts) = self.multipart_copy(197 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)198 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key,199 UploadId=upload_id, MultipartUpload={'Parts': parts})200 print(client.list_objects(Bucket=dest_bucket_name))201 response = client.get_object(Bucket=dest_bucket_name, Key=dest_key)202 self.eq(size, response['ContentLength'])203 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)204 @pytest.mark.ess205 def test_multipart_upload(self, s3cfg_global_unique):206 """207 测试-验证结束分段上传任务,并验证结果是否正确;208 含headers里的bytes-used、object-count;body是否正确。209 """210 client = get_client(s3cfg_global_unique)211 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)212 key = "mymultipart"213 content_type = 'text/bla'214 obj_len = 30 * 1024 * 1024215 metadata = {'foo': 'bar'}216 (upload_id, data, parts) = self.multipart_upload(217 s3cfg_global_unique, bucket_name=bucket_name,218 key=key, size=obj_len, content_type=content_type, metadata=metadata)219 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,220 MultipartUpload={'Parts': parts})221 response = client.head_bucket(Bucket=bucket_name)222 rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', obj_len))223 self.eq(rgw_bytes_used, obj_len)224 rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 1))225 self.eq(rgw_object_count, 1)226 response = client.get_object(Bucket=bucket_name, Key=key)227 self.eq(response['ContentType'], content_type)228 self.eq(response['Metadata'], metadata)229 body = self.get_body(response)230 self.eq(len(body), response['ContentLength'])231 self.eq(body, data)232 self.check_content_using_range(client, key, bucket_name, data, 1000000)233 self.check_content_using_range(client, key, bucket_name, data, 10000000)234 @pytest.mark.ess235 def test_multipart_upload_resend_part(self, s3cfg_global_unique):236 """237 测试-验证不同文件大小下,结束分段上传是否成功238 """239 client = get_client(s3cfg_global_unique)240 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)241 key = "mymultipart"242 obj_len = 30 * 1024 * 1024243 self.check_upload_multipart_resend(s3cfg_global_unique, bucket_name, key, obj_len, [0])244 self.check_upload_multipart_resend(s3cfg_global_unique, bucket_name, key, obj_len, [1])245 self.check_upload_multipart_resend(s3cfg_global_unique, bucket_name, key, obj_len, [2])246 self.check_upload_multipart_resend(s3cfg_global_unique, bucket_name, key, obj_len, [1, 2])247 self.check_upload_multipart_resend(s3cfg_global_unique, bucket_name, key, obj_len, [0, 1, 2, 3, 4, 5])248 @pytest.mark.ess249 def test_multipart_upload_multiple_sizes(self, s3cfg_global_unique):250 """251 测试-验证不同文件大小下结束分段上传是否成功252 """253 client = get_client(s3cfg_global_unique)254 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)255 key = "mymultipart"256 obj_len = 5 * 1024 * 1024257 (upload_id, data, parts) = self.multipart_upload(258 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)259 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,260 MultipartUpload={'Parts': parts})261 obj_len = 5 * 1024 * 1024 + 100 * 1024262 (upload_id, data, parts) = self.multipart_upload(263 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)264 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,265 MultipartUpload={'Parts': parts})266 obj_len = 5 * 1024 * 1024 + 600 * 1024267 (upload_id, data, parts) = self.multipart_upload(268 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)269 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,270 MultipartUpload={'Parts': parts})271 obj_len = 10 * 1024 * 1024 + 100 * 1024272 (upload_id, data, parts) = self.multipart_upload(273 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)274 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,275 MultipartUpload={'Parts': parts})276 obj_len = 10 * 1024 * 1024 + 600 * 1024277 (upload_id, data, parts) = self.multipart_upload(278 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)279 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,280 MultipartUpload={'Parts': parts})281 obj_len = 10 * 1024 * 1024282 (upload_id, data, parts) = self.multipart_upload(283 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)284 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,285 MultipartUpload={'Parts': parts})286 @pytest.mark.ess287 def test_multipart_copy_multiple_sizes(self, s3cfg_global_unique):288 """289 测试-验证不同文件大小下upload_part_copy是否成功290 """291 client = get_client(s3cfg_global_unique)292 src_key = 'foo'293 src_bucket_name = self.create_key_with_random_content(s3cfg_global_unique, src_key, 12 * 1024 * 1024)294 dest_bucket_name = self.get_new_bucket(client, s3cfg_global_unique)295 dest_key = "mymultipart"296 size = 5 * 1024 * 1024297 (upload_id, parts) = self.multipart_copy(298 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)299 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,300 MultipartUpload={'Parts': parts})301 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)302 size = 5 * 1024 * 1024 + 100 * 1024303 (upload_id, parts) = self.multipart_copy(304 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)305 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,306 MultipartUpload={'Parts': parts})307 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)308 size = 5 * 1024 * 1024 + 600 * 1024309 (upload_id, parts) = self.multipart_copy(310 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)311 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,312 MultipartUpload={'Parts': parts})313 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)314 size = 10 * 1024 * 1024 + 100 * 1024315 (upload_id, parts) = self.multipart_copy(316 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)317 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,318 MultipartUpload={'Parts': parts})319 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)320 size = 10 * 1024 * 1024 + 600 * 1024321 (upload_id, parts) = self.multipart_copy(322 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)323 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,324 MultipartUpload={'Parts': parts})325 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)326 size = 10 * 1024 * 1024327 (upload_id, parts) = self.multipart_copy(328 s3cfg_global_unique, src_bucket_name, src_key, dest_bucket_name, dest_key, size)329 client.complete_multipart_upload(Bucket=dest_bucket_name, Key=dest_key, UploadId=upload_id,330 MultipartUpload={'Parts': parts})331 self.check_key_content(client, src_key, src_bucket_name, dest_key, dest_bucket_name)332 @pytest.mark.ess333 def test_multipart_upload_size_too_small(self, s3cfg_global_unique):334 """335 测试-验证分段小于5MiB时(除最后一段),进行合并会报错,336 400,EntityTooSmall337 """338 client = get_client(s3cfg_global_unique)339 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)340 key = "mymultipart"341 size = 100 * 1024342 (upload_id, data, parts) = self.multipart_upload(343 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=size, part_size=10 * 1024)344 e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket_name, Key=key,345 UploadId=upload_id, MultipartUpload={'Parts': parts})346 status, error_code = self.get_status_and_error_code(e.response)347 self.eq(status, 400)348 self.eq(error_code, 'EntityTooSmall')349 @pytest.mark.ess350 def test_multipart_upload_contents(self, s3cfg_global_unique):351 """352 测试-验证分段上传对象body与上传的源对象是一致的353 """354 client = get_client(s3cfg_global_unique)355 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)356 self.do_test_multipart_upload_contents(client, bucket_name, 'mymultipart', 3)357 @pytest.mark.ess358 def test_multipart_upload_overwrite_existing_object(self, s3cfg_global_unique):359 """360 测试-对已存在的对象进行覆盖写(使用分段上传)361 """362 client = get_client(s3cfg_global_unique)363 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)364 key = 'mymultipart'365 payload = '12345' * 1024 * 1024366 num_parts = 2367 client.put_object(Bucket=bucket_name, Key=key, Body=payload)368 response = client.create_multipart_upload(Bucket=bucket_name, Key=key)369 upload_id = response['UploadId']370 parts = []371 for part_num in range(0, num_parts):372 response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=part_num + 1,373 Body=payload)374 parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': part_num + 1})375 client.complete_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id,376 MultipartUpload={'Parts': parts})377 response = client.get_object(Bucket=bucket_name, Key=key)378 test_string = self.get_body(response)379 assert test_string == payload * num_parts380 @pytest.mark.ess381 def test_abort_multipart_upload(self, s3cfg_global_unique):382 """383 测试-验证中断分段上传任务384 """385 client = get_client(s3cfg_global_unique)386 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)387 key = "mymultipart"388 obj_len = 10 * 1024 * 1024389 (upload_id, data, parts) = self.multipart_upload(390 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=obj_len)391 client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id)392 response = client.head_bucket(Bucket=bucket_name)393 rgw_bytes_used = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-bytes-used', 0))394 self.eq(rgw_bytes_used, 0)395 rgw_object_count = int(response['ResponseMetadata']['HTTPHeaders'].get('x-rgw-object-count', 0))396 self.eq(rgw_object_count, 0)397 @pytest.mark.ess398 def test_abort_multipart_upload_not_found(self, s3cfg_global_unique):399 """400 测试-验证中断不存在的分段上传任务,401 404,NoSuchUpload402 """403 client = get_client(s3cfg_global_unique)404 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)405 key = "mymultipart"406 client.put_object(Bucket=bucket_name, Key=key)407 e = assert_raises(ClientError, client.abort_multipart_upload, Bucket=bucket_name, Key=key, UploadId='56788')408 status, error_code = self.get_status_and_error_code(e.response)409 self.eq(status, 404)410 self.eq(error_code, 'NoSuchUpload')411 @pytest.mark.ess412 def test_list_multipart_upload(self, s3cfg_global_unique):413 """414 测试-验证list_multipart_uploads结果是否正确,415 含对同一个对象多次分段上传的情况。416 """417 client = get_client(s3cfg_global_unique)418 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)419 key = "mymultipart"420 mb = 1024 * 1024421 upload_ids = []422 (upload_id1, data, parts) = self.multipart_upload(423 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=5 * mb)424 upload_ids.append(upload_id1)425 (upload_id2, data, parts) = self.multipart_upload(426 s3cfg_global_unique, bucket_name=bucket_name, key=key, size=6 * mb)427 upload_ids.append(upload_id2)428 key2 = "mymultipart2"429 (upload_id3, data, parts) = self.multipart_upload(430 s3cfg_global_unique, bucket_name=bucket_name, key=key2, size=5 * mb)431 upload_ids.append(upload_id3)432 response = client.list_multipart_uploads(Bucket=bucket_name)433 uploads = response['Uploads']434 resp_uploadids = []435 for i in range(0, len(uploads)):436 resp_uploadids.append(uploads[i]['UploadId'])437 for i in range(0, len(upload_ids)):438 self.eq(True, (upload_ids[i] in resp_uploadids))439 client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id1)440 client.abort_multipart_upload(Bucket=bucket_name, Key=key, UploadId=upload_id2)441 client.abort_multipart_upload(Bucket=bucket_name, Key=key2, UploadId=upload_id3)442 @pytest.mark.ess443 @pytest.mark.fails_on_ess # TODO: ObjectOwnership parameter is not suitable.444 @pytest.mark.xfail(reason="预期:list_multipart_uploads中owner按照ObjectOwnership显示", run=True, strict=True)445 def test_list_multipart_upload_owner(self, s3cfg_global_unique):446 """447 测试-验证使用不同对象用户对public-read-write的桶进行list_multipart_uploads操作448 """449 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/acls.html450 # https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/userguide/about-object-ownership.html451 client1 = get_client(s3cfg_global_unique)452 user1 = s3cfg_global_unique.main_user_id453 name1 = s3cfg_global_unique.main_display_name454 client2 = get_alt_client(s3cfg_global_unique)455 user2 = s3cfg_global_unique.alt_user_id456 name2 = s3cfg_global_unique.alt_display_name457 bucket_name = self.get_new_bucket(client1, s3cfg_global_unique)458 # ObjectOwnership: 'BucketOwnerPreferred'|'ObjectWriter'|'BucketOwnerEnforced'459 # bucket_name = self.get_new_bucket(client1, s3cfg_global_unique, ObjectOwnership='ObjectWriter')460 # add bucket acl for public read/write access461 client1.put_bucket_acl(Bucket=bucket_name, ACL='public-read-write')462 key1 = 'multipart1'463 key2 = 'multipart2'464 upload1 = client1.create_multipart_upload(Bucket=bucket_name, Key=key1)['UploadId']465 try:466 upload2 = client2.create_multipart_upload(Bucket=bucket_name, Key=key2)['UploadId']467 try:468 # match fields of an Upload from ListMultipartUploadsResult469 def match(upload, key, uploadid, userid, username):470 self.eq(upload['Key'], key)471 self.eq(upload['UploadId'], uploadid)472 self.eq(upload['Initiator']['ID'], userid)473 self.eq(upload['Initiator']['DisplayName'], username)474 self.eq(upload['Owner']['ID'], userid)475 self.eq(upload['Owner']['DisplayName'], username)476 # list uploads with client1477 uploads1 = client1.list_multipart_uploads(Bucket=bucket_name)['Uploads']478 self.eq(len(uploads1), 2)479 match(uploads1[0], key1, upload1, user1, name1)480 match(uploads1[1], key2, upload2, user2, name2)481 # list uploads with client2482 uploads2 = client2.list_multipart_uploads(Bucket=bucket_name)['Uploads']483 self.eq(len(uploads2), 2)484 match(uploads2[0], key1, upload1, user1, name1)485 match(uploads2[1], key2, upload2, user2, name2)486 finally:487 client2.abort_multipart_upload(Bucket=bucket_name, Key=key2, UploadId=upload2)488 finally:489 client1.abort_multipart_upload(Bucket=bucket_name, Key=key1, UploadId=upload1)490 @pytest.mark.ess491 def test_multipart_upload_missing_part(self, s3cfg_global_unique):492 """493 测试-验证使用错误的PartNumber进行合并分段任务,494 400,InvalidPart495 """496 client = get_client(s3cfg_global_unique)497 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)498 key = "mymultipart"499 response = client.create_multipart_upload(Bucket=bucket_name, Key=key)500 upload_id = response['UploadId']501 parts = []502 response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=1,503 Body=bytes('\x00', 'utf-8'))504 # 'PartNumber should be 1'505 parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': 9999})506 e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket_name, Key=key,507 UploadId=upload_id, MultipartUpload={'Parts': parts})508 status, error_code = self.get_status_and_error_code(e.response)509 self.eq(status, 400)510 self.eq(error_code, 'InvalidPart')511 @pytest.mark.ess512 def test_multipart_upload_incorrect_etag(self, s3cfg_global_unique):513 """514 测试-验证使用错误的ETag进行合并分段任务,515 400,InvalidPart516 """517 client = get_client(s3cfg_global_unique)518 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)519 key = "mymultipart"520 response = client.create_multipart_upload(Bucket=bucket_name, Key=key)521 upload_id = response['UploadId']522 parts = []523 client.upload_part(524 UploadId=upload_id, Bucket=bucket_name, Key=key, PartNumber=1, Body=bytes('\x00', 'utf-8'))525 # 'ETag' should be "93b885adfe0da089cdf634904fd59f71"526 parts.append({'ETag': "ffffffffffffffffffffffffffffffff", 'PartNumber': 1})527 e = assert_raises(ClientError, client.complete_multipart_upload, Bucket=bucket_name, Key=key,528 UploadId=upload_id, MultipartUpload={'Parts': parts})529 status, error_code = self.get_status_and_error_code(e.response)530 self.eq(status, 400)531 self.eq(error_code, 'InvalidPart')532 @pytest.mark.ess533 def test_atomic_multipart_upload_write(self, s3cfg_global_unique):534 """535 测试-验证对已存在的对象进行创建分段上传任务后中断此任务,查看是否影响此对象。536 """537 client = get_client(s3cfg_global_unique)538 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)539 client.put_object(Bucket=bucket_name, Key='foo', Body='bar')540 response = client.create_multipart_upload(Bucket=bucket_name, Key='foo')541 upload_id = response['UploadId']542 response = client.get_object(Bucket=bucket_name, Key='foo')543 body = self.get_body(response)544 self.eq(body, 'bar')545 client.abort_multipart_upload(Bucket=bucket_name, Key='foo', UploadId=upload_id)546 response = client.get_object(Bucket=bucket_name, Key='foo')547 body = self.get_body(response)548 self.eq(body, 'bar')549 @pytest.mark.ess550 def test_multipart_resend_first_finishes_last(self, s3cfg_global_unique):551 """552 测试-验证对同一个分段进行覆盖写,查看结果是否符合预期,553 合并分段时使用覆盖后的etag,则对象内容是覆盖写的内容。554 """555 # TODO: 是否可以增加一个步骤:合并分段后,查看被覆盖的分段是否还存在且被回收556 client = get_client(s3cfg_global_unique)557 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)558 key_name = "mymultipart"559 response = client.create_multipart_upload(Bucket=bucket_name, Key=key_name)560 upload_id = response['UploadId']561 # file_size = 8*1024*1024562 file_size = 8563 counter = Counter(0)564 # upload_part might read multiple times from the object565 # first time when it calculates md5, second time when it writes data566 # out. We want to interject only on the last time, but we can't be567 # sure how many times it's going to read, so let's have a test run568 # and count the number of reads569 fp_dry_run = FakeWriteFile(file_size, 'C', lambda: counter.inc())570 parts = []571 response = client.upload_part(572 UploadId=upload_id, Bucket=bucket_name, Key=key_name, PartNumber=1, Body=fp_dry_run)573 parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': 1})574 client.complete_multipart_upload(Bucket=bucket_name, Key=key_name, UploadId=upload_id,575 MultipartUpload={'Parts': parts})576 client.delete_object(Bucket=bucket_name, Key=key_name)577 # clear parts578 parts[:] = []579 # ok, now for the actual test580 fp_b = FakeWriteFile(file_size, 'B')581 def upload_fp_b():582 res = client.upload_part(583 UploadId=upload_id, Bucket=bucket_name, Key=key_name, Body=fp_b, PartNumber=1)584 parts.append({'ETag': res['ETag'].strip('"'), 'PartNumber': 1})585 action = ActionOnCount(counter.val, lambda: upload_fp_b())586 response = client.create_multipart_upload(Bucket=bucket_name, Key=key_name)587 upload_id = response['UploadId']588 fp_a = FakeWriteFile(file_size, 'A', lambda: action.trigger())589 response = client.upload_part(UploadId=upload_id, Bucket=bucket_name, Key=key_name, PartNumber=1, Body=fp_a)590 parts.append({'ETag': response['ETag'].strip('"'), 'PartNumber': 1})591 client.complete_multipart_upload(Bucket=bucket_name, Key=key_name, UploadId=upload_id,592 MultipartUpload={'Parts': parts})...

Full Screen

Full Screen

export_csv.py

Source:export_csv.py Github

copy

Full Screen

...42 s3 = boto3.client('s3')43 upload_part(s3, part_data.getvalue(), upload_info)44 if is_end:45 update_status_file(upload_info, True, s3)46 complete_multipart_upload(s3, upload_info)47 else:48 update_status_file(upload_info, False)49 trigger_next_upload(upload_info, context)50def upload_part(s3, part_data, upload_info):51 upload = s3.upload_part(Bucket=upload_info['bucket_name'], Key=upload_info['export_file_key'],52 PartNumber=upload_info['part_num'], UploadId=upload_info['upload_id'], Body=part_data)53 upload_info['parts_info']['Parts'].append({54 'PartNumber': upload_info['part_num'],55 'ETag': upload['ETag']56 });57def complete_multipart_upload(s3, upload_info):58 s3.complete_multipart_upload(Bucket=upload_info['bucket_name'], Key=upload_info['export_file_key'],59 UploadId=upload_info['upload_id'], MultipartUpload=upload_info['parts_info'])60def trigger_next_upload(upload_info, context):61 upload_info['part_num'] += 162 boto3.client('lambda').invoke_async(63 FunctionName=context.invoked_function_arn,64 InvokeArgs=json.dumps(upload_info)65 )66def update_status_file(upload_info, finished, s3=None):67 bucket = survey_utils.get_answer_submissions_export_s3_bucket()68 status = {69 "num_submissions_exported": upload_info['num_submissions_exported'],70 "finished": finished71 }72 if finished:...

Full Screen

Full Screen

urls.py

Source:urls.py Github

copy

Full Screen

1from django.conf.urls import patterns, include, url2from django.contrib import admin3from indee import views4urlpatterns = patterns(5 url(r'^admin/', include(admin.site.urls)),6 url(r'^home/', views.home),7 url(r'^initiate_upload/', views.initiate_upload),8 url(r'^get_presigned_url/', views.get_presigned_url_for_part),9 url(r'^complete_multipart_upload.', views.complete_multipart_upload),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful