Best Python code snippet using localstack_python
s3_starter.py
Source:s3_starter.py  
...50    if TMP_STATE.get(PATCHES_APPLIED, False):51        return52    TMP_STATE[PATCHES_APPLIED] = True53    s3_models.DEFAULT_KEY_BUFFER_SIZE = S3_MAX_FILE_SIZE_MB * 1024 * 102454    def s3_update_acls(self, request, query, bucket_name, key_name):55        # fix for - https://github.com/localstack/localstack/issues/173356        #         - https://github.com/localstack/localstack/issues/117057        acl_key = 'acl|%s|%s' % (bucket_name, key_name)58        acl = self._acl_from_headers(request.headers)59        if acl:60            TMP_STATE[acl_key] = acl61        if not query.get('uploadId'):62            return63        bucket = self.backend.get_bucket(bucket_name)64        key = bucket and self.backend.get_object(bucket_name, key_name)65        if not key:66            return67        acl = acl or TMP_STATE.pop(acl_key, None) or bucket.acl68        if acl:69            key.set_acl(acl)70    # patch Bucket.create_from_cloudformation_json in moto71    @classmethod72    def Bucket_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):73        result = create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)74        # remove the bucket from the backend, as our template_deployer will take care of creating the resource75        resource_name = s3_listener.normalize_bucket_name(resource_name)76        s3_models.s3_backend.buckets.pop(resource_name)77        return result78    create_from_cloudformation_json_orig = s3_models.FakeBucket.create_from_cloudformation_json79    s3_models.FakeBucket.create_from_cloudformation_json = Bucket_create_from_cloudformation_json80    # patch S3Bucket.create_bucket(..)81    def create_bucket(self, bucket_name, region_name, *args, **kwargs):82        bucket_name = s3_listener.normalize_bucket_name(bucket_name)83        return create_bucket_orig(bucket_name, region_name, *args, **kwargs)84    create_bucket_orig = s3_models.s3_backend.create_bucket85    s3_models.s3_backend.create_bucket = types.MethodType(create_bucket, s3_models.s3_backend)86    # patch S3Bucket.get_bucket(..)87    def get_bucket(self, bucket_name, *args, **kwargs):88        bucket_name = s3_listener.normalize_bucket_name(bucket_name)89        if bucket_name == BUCKET_MARKER_LOCAL:90            return None91        return get_bucket_orig(bucket_name, *args, **kwargs)92    get_bucket_orig = s3_models.s3_backend.get_bucket93    s3_models.s3_backend.get_bucket = types.MethodType(get_bucket, s3_models.s3_backend)94    # patch S3Bucket.get_bucket(..)95    def delete_bucket(self, bucket_name, *args, **kwargs):96        bucket_name = s3_listener.normalize_bucket_name(bucket_name)97        try:98            return delete_bucket_orig(bucket_name, *args, **kwargs)99        except s3_exceptions.MissingBucket:100            pass101    delete_bucket_orig = s3_models.s3_backend.delete_bucket102    s3_models.s3_backend.delete_bucket = types.MethodType(delete_bucket, s3_models.s3_backend)103    # patch _key_response_post(..)104    def s3_key_response_post(self, request, body, bucket_name, query, key_name, *args, **kwargs):105        result = s3_key_response_post_orig(request, body, bucket_name, query, key_name, *args, **kwargs)106        s3_update_acls(self, request, query, bucket_name, key_name)107        return result108    s3_key_response_post_orig = s3_responses.S3ResponseInstance._key_response_post109    s3_responses.S3ResponseInstance._key_response_post = types.MethodType(110        s3_key_response_post, s3_responses.S3ResponseInstance)111    # patch _key_response_put(..)112    def s3_key_response_put(self, request, body, bucket_name, query, key_name, headers, *args, **kwargs):113        result = s3_key_response_put_orig(request, body, bucket_name, query, key_name, headers, *args, **kwargs)114        s3_update_acls(self, request, query, bucket_name, key_name)115        return result116    s3_key_response_put_orig = s3_responses.S3ResponseInstance._key_response_put117    s3_responses.S3ResponseInstance._key_response_put = types.MethodType(118        s3_key_response_put, s3_responses.S3ResponseInstance)119    # patch DeleteObjectTagging120    def s3_key_response_delete(self, bucket_name, query, key_name, *args, **kwargs):121        # Fixes https://github.com/localstack/localstack/issues/1083122        if query.get('tagging'):123            self._set_action('KEY', 'DELETE', query)124            self._authenticate_and_authorize_s3_action()125            key = self.backend.get_object(bucket_name, key_name)126            key.tags = {}127            self.backend.tagger.delete_all_tags_for_resource(key.arn)128            return 204, {}, ''...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
