How to use abort_multipart_upload method in localstack

Best Python code snippet using localstack_python

xml_utils.py

Source:xml_utils.py Github

copy

Full Screen

...259 expiration.days = _find_int(expiration_node, 'Days')260 elif expiration_node.find('Date') is not None:261 expiration.date = iso8601_to_date(_find_tag(expiration_node, 'Date'))262 return expiration263def parse_lifecycle_abort_multipart_upload(abort_multipart_upload_node):264 if abort_multipart_upload_node is None:265 return None266 abort_multipart_upload = AbortMultipartUpload()267 if abort_multipart_upload_node.find('Days') is not None:268 abort_multipart_upload.days = _find_int(abort_multipart_upload_node, 'Days')269 elif abort_multipart_upload_node.find('CreatedBeforeDate') is not None:270 abort_multipart_upload.created_before_date = iso8601_to_date(_find_tag(abort_multipart_upload_node,271 'CreatedBeforeDate'))272 return abort_multipart_upload273def parse_lifecycle_storage_transitions(storage_transition_nodes):274 storage_transitions = []275 for storage_transition_node in storage_transition_nodes:276 storage_class = _find_tag(storage_transition_node, 'StorageClass')277 storage_transition = StorageTransition(storage_class=storage_class)278 if storage_transition_node.find('Days') is not None:279 storage_transition.days = _find_int(storage_transition_node, 'Days')280 elif storage_transition_node.find('CreatedBeforeDate') is not None:281 storage_transition.created_before_date = iso8601_to_date(_find_tag(storage_transition_node,282 'CreatedBeforeDate'))283 storage_transitions.append(storage_transition)284 return storage_transitions285def parse_get_bucket_lifecycle(result, body):286 root = ElementTree.fromstring(body)287 for rule_node in root.findall('Rule'):288 expiration = parse_lifecycle_expiration(rule_node.find('Expiration'))289 abort_multipart_upload = parse_lifecycle_abort_multipart_upload(rule_node.find('AbortMultipartUpload'))290 storage_transitions = parse_lifecycle_storage_transitions(rule_node.findall('Transition'))291 rule = LifecycleRule(292 _find_tag(rule_node, 'ID'),293 _find_tag(rule_node, 'Prefix'),294 status=_find_tag(rule_node, 'Status'),295 expiration=expiration,296 abort_multipart_upload=abort_multipart_upload,297 storage_transitions=storage_transitions298 )299 result.rules.append(rule)300 return result301def parse_get_bucket_cors(result, body):302 root = ElementTree.fromstring(body)303 for rule_node in root.findall('CORSRule'):...

Full Screen

Full Screen

oci_object.py

Source:oci_object.py Github

copy

Full Screen

...316 except ServiceError as ex:317 module.fail_json(msg=ex.message)318 result["changed"] = changed319 return result320def abort_multipart_upload(object_storage_client, module):321 result = dict(changed=False)322 multipart_upload_exists = False323 # oci sdk does not fail even if the upload id does not exist. So check and set changed to False if324 # the upload id does not exist325 multipart_uploads = oci_utils.list_all_resources(326 object_storage_client.list_multipart_uploads,327 namespace_name=module.params.get("namespace_name"),328 bucket_name=module.params.get("bucket_name"),329 )330 for multipart_upload in multipart_uploads:331 if multipart_upload.object == module.params.get(332 "object_name"333 ) and multipart_upload.upload_id == module.params.get("upload_id"):334 multipart_upload_exists = True335 break336 if not multipart_upload_exists:337 result["msg"] = "upload id {0} does not exist.".format(338 module.params.get("upload_id")339 )340 return result341 oci_utils.call_with_backoff(342 object_storage_client.abort_multipart_upload,343 namespace_name=module.params.get("namespace_name"),344 bucket_name=module.params.get("bucket_name"),345 object_name=module.params.get("object_name"),346 upload_id=module.params.get("upload_id"),347 )348 result["changed"] = True349 return result350def src_is_valid(module, src):351 if not os.path.isfile(to_bytes(src)):352 module.fail_json(msg="The source path %s must be a file." % src)353 if not os.access(to_bytes(src), os.R_OK):354 module.fail_json(355 msg="Failed to access %s. Make sure the file exists and that you have "356 "read access." % src357 )358 return True359def main():360 module_args = oci_utils.get_common_arg_spec()361 module_args.update(362 dict(363 namespace_name=dict(type="str", required=True, aliases=["namespace"]),364 bucket_name=dict(type="str", required=True, aliases=["bucket"]),365 object_name=dict(type="str", required=True, aliases=["object", "name"]),366 src=dict(type="str", required=False),367 dest=dict(type="str", required=False),368 state=dict(369 type="str",370 required=False,371 default="present",372 choices=["absent", "present", "abort_multipart_upload"],373 ),374 force=dict(375 type="bool", required=False, default=True, aliases=["overwrite"]376 ),377 content_length=dict(type="str", required=False),378 opc_client_request_id=dict(type="str", required=False),379 content_md5=dict(type="str", required=False),380 content_type=dict(381 type="str", required=False, default="application/octet-stream"382 ),383 content_language=dict(type="str", required=False),384 content_encoding=dict(type="str", required=False),385 upload_id=dict(type="str", required=False),386 opc_meta=dict(type=dict, required=False, aliases=["metadata"]),387 multipart_upload=dict(type=bool, required=False, default=True),388 parallel_uploads=dict(type=bool, required=False, default=True),389 )390 )391 module = AnsibleModule(392 argument_spec=module_args,393 supports_check_mode=False,394 mutually_exclusive=[("src", "dest")],395 )396 if not HAS_OCI_PY_SDK:397 module.fail_json(msg="oci python sdk required for this module")398 object_storage_client = oci_utils.create_service_client(module, ObjectStorageClient)399 state = module.params["state"]400 src = module.params["src"]401 dest = module.params["dest"]402 force = module.params["force"]403 obj = module.params["object_name"]404 result = dict(changed=False)405 if state == "present" and dest is not None:406 if force is True or not os.path.isfile(to_bytes(dest)):407 result = get_object(object_storage_client, module)408 else:409 result["msg"] = (410 "Destination %s already exists. Use force option to overwrite." % dest411 )412 elif state == "present" and src is not None:413 if src_is_valid(module, src):414 if force is True or head_object(object_storage_client, module) is None:415 result = put_object(object_storage_client, module)416 else:417 result["msg"] = (418 "Object %s already present in bucket. Use force option to overwrite."419 % obj420 )421 elif state == "absent":422 result = delete_object(object_storage_client, module)423 elif state == "abort_multipart_upload":424 if not module.params.get("upload_id"):425 module.fail_json(426 msg="upload_id required for aborting the multipart upload."427 )428 result = abort_multipart_upload(object_storage_client, module)429 else:430 module.fail_json(msg="Missing either src or dest option.")431 module.exit_json(**result)432if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful