Best Python code snippet using localstack_python
test_s3.py
Source:test_s3.py  
1#2# Licensed to the Apache Software Foundation (ASF) under one3# or more contributor license agreements.  See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership.  The ASF licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License.  You may obtain a copy of the License at9#10#   http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied.  See the License for the16# specific language governing permissions and limitations17# under the License.18#19import gzip as gz20import os21import tempfile22from unittest import mock23from unittest.mock import Mock24import boto325import pytest26from botocore.exceptions import ClientError, NoCredentialsError27from airflow.exceptions import AirflowException28from airflow.models import Connection29from airflow.providers.amazon.aws.hooks.s3 import S3Hook, provide_bucket_name, unify_bucket_name_and_key30try:31    from moto import mock_s332except ImportError:33    mock_s3 = None34@pytest.mark.skipif(mock_s3 is None, reason='moto package not present')35class TestAwsS3Hook:36    @mock_s337    def test_get_conn(self):38        hook = S3Hook()39        assert hook.get_conn() is not None40    def test_parse_s3_url(self):41        parsed = S3Hook.parse_s3_url("s3://test/this/is/not/a-real-key.txt")42        assert parsed == ("test", "this/is/not/a-real-key.txt"), "Incorrect parsing of the s3 url"43    def test_check_for_bucket(self, s3_bucket):44        hook = S3Hook()45        assert hook.check_for_bucket(s3_bucket) is True46        assert hook.check_for_bucket('not-a-bucket') is False47    def test_check_for_bucket_raises_error_with_invalid_conn_id(self, s3_bucket, monkeypatch):48        monkeypatch.delenv('AWS_PROFILE', raising=False)49        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)50        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)51        hook = S3Hook(aws_conn_id="does_not_exist")52        with pytest.raises(NoCredentialsError):53            hook.check_for_bucket(s3_bucket)54    @mock_s355    def test_get_bucket(self):56        hook = S3Hook()57        assert hook.get_bucket('bucket') is not None58    @mock_s359    def test_create_bucket_default_region(self):60        hook = S3Hook()61        hook.create_bucket(bucket_name='new_bucket')62        assert hook.get_bucket('new_bucket') is not None63    @mock_s364    def test_create_bucket_us_standard_region(self, monkeypatch):65        monkeypatch.delenv('AWS_DEFAULT_REGION', raising=False)66        hook = S3Hook()67        hook.create_bucket(bucket_name='new_bucket', region_name='us-east-1')68        bucket = hook.get_bucket('new_bucket')69        assert bucket is not None70        region = bucket.meta.client.get_bucket_location(Bucket=bucket.name).get('LocationConstraint')71        # https://github.com/spulec/moto/pull/196172        # If location is "us-east-1", LocationConstraint should be None73        assert region is None74    @mock_s375    def test_create_bucket_other_region(self):76        hook = S3Hook()77        hook.create_bucket(bucket_name='new_bucket', region_name='us-east-2')78        bucket = hook.get_bucket('new_bucket')79        assert bucket is not None80        region = bucket.meta.client.get_bucket_location(Bucket=bucket.name).get('LocationConstraint')81        assert region == 'us-east-2'82    def test_check_for_prefix(self, s3_bucket):83        hook = S3Hook()84        bucket = hook.get_bucket(s3_bucket)85        bucket.put_object(Key='a', Body=b'a')86        bucket.put_object(Key='dir/b', Body=b'b')87        assert hook.check_for_prefix(bucket_name=s3_bucket, prefix='dir/', delimiter='/') is True88        assert hook.check_for_prefix(bucket_name=s3_bucket, prefix='a', delimiter='/') is False89    def test_list_prefixes(self, s3_bucket):90        hook = S3Hook()91        bucket = hook.get_bucket(s3_bucket)92        bucket.put_object(Key='a', Body=b'a')93        bucket.put_object(Key='dir/b', Body=b'b')94        assert [] == hook.list_prefixes(s3_bucket, prefix='non-existent/')95        assert ['dir/'] == hook.list_prefixes(s3_bucket, delimiter='/')96        assert ['a'] == hook.list_keys(s3_bucket, delimiter='/')97        assert ['dir/b'] == hook.list_keys(s3_bucket, prefix='dir/')98    def test_list_prefixes_paged(self, s3_bucket):99        hook = S3Hook()100        bucket = hook.get_bucket(s3_bucket)101        # we don't need to test the paginator that's covered by boto tests102        keys = ["%s/b" % i for i in range(2)]103        dirs = ["%s/" % i for i in range(2)]104        for key in keys:105            bucket.put_object(Key=key, Body=b'a')106        assert sorted(dirs) == sorted(hook.list_prefixes(s3_bucket, delimiter='/', page_size=1))107    def test_list_keys(self, s3_bucket):108        hook = S3Hook()109        bucket = hook.get_bucket(s3_bucket)110        bucket.put_object(Key='a', Body=b'a')111        bucket.put_object(Key='dir/b', Body=b'b')112        assert [] == hook.list_keys(s3_bucket, prefix='non-existent/')113        assert ['a', 'dir/b'] == hook.list_keys(s3_bucket)114        assert ['a'] == hook.list_keys(s3_bucket, delimiter='/')115        assert ['dir/b'] == hook.list_keys(s3_bucket, prefix='dir/')116    def test_list_keys_paged(self, s3_bucket):117        hook = S3Hook()118        bucket = hook.get_bucket(s3_bucket)119        keys = [str(i) for i in range(2)]120        for key in keys:121            bucket.put_object(Key=key, Body=b'a')122        assert sorted(keys) == sorted(hook.list_keys(s3_bucket, delimiter='/', page_size=1))123    def test_check_for_key(self, s3_bucket):124        hook = S3Hook()125        bucket = hook.get_bucket(s3_bucket)126        bucket.put_object(Key='a', Body=b'a')127        assert hook.check_for_key('a', s3_bucket) is True128        assert hook.check_for_key(f's3://{s3_bucket}//a') is True129        assert hook.check_for_key('b', s3_bucket) is False130        assert hook.check_for_key(f's3://{s3_bucket}//b') is False131    def test_check_for_key_raises_error_with_invalid_conn_id(self, monkeypatch, s3_bucket):132        monkeypatch.delenv('AWS_PROFILE', raising=False)133        monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)134        monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)135        hook = S3Hook(aws_conn_id="does_not_exist")136        with pytest.raises(NoCredentialsError):137            hook.check_for_key('a', s3_bucket)138    def test_get_key(self, s3_bucket):139        hook = S3Hook()140        bucket = hook.get_bucket(s3_bucket)141        bucket.put_object(Key='a', Body=b'a')142        assert hook.get_key('a', s3_bucket).key == 'a'143        assert hook.get_key(f's3://{s3_bucket}/a').key == 'a'144    def test_read_key(self, s3_bucket):145        hook = S3Hook()146        bucket = hook.get_bucket(s3_bucket)147        bucket.put_object(Key='my_key', Body=b'Cont\xC3\xA9nt')148        assert hook.read_key('my_key', s3_bucket) == 'Contént'149    # As of 1.3.2, Moto doesn't support select_object_content yet.150    @mock.patch('airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_client_type')151    def test_select_key(self, mock_get_client_type, s3_bucket):152        mock_get_client_type.return_value.select_object_content.return_value = {153            'Payload': [{'Records': {'Payload': b'Cont\xC3\xA9nt'}}]154        }155        hook = S3Hook()156        assert hook.select_key('my_key', s3_bucket) == 'Contént'157    def test_check_for_wildcard_key(self, s3_bucket):158        hook = S3Hook()159        bucket = hook.get_bucket(s3_bucket)160        bucket.put_object(Key='abc', Body=b'a')161        bucket.put_object(Key='a/b', Body=b'a')162        assert hook.check_for_wildcard_key('a*', s3_bucket) is True163        assert hook.check_for_wildcard_key('abc', s3_bucket) is True164        assert hook.check_for_wildcard_key(f's3://{s3_bucket}//a*') is True165        assert hook.check_for_wildcard_key(f's3://{s3_bucket}//abc') is True166        assert hook.check_for_wildcard_key('a', s3_bucket) is False167        assert hook.check_for_wildcard_key('b', s3_bucket) is False168        assert hook.check_for_wildcard_key(f's3://{s3_bucket}//a') is False169        assert hook.check_for_wildcard_key(f's3://{s3_bucket}//b') is False170    def test_get_wildcard_key(self, s3_bucket):171        hook = S3Hook()172        bucket = hook.get_bucket(s3_bucket)173        bucket.put_object(Key='abc', Body=b'a')174        bucket.put_object(Key='a/b', Body=b'a')175        # The boto3 Class API is _odd_, and we can't do an isinstance check as176        # each instance is a different class, so lets just check one property177        # on S3.Object. Not great but...178        assert hook.get_wildcard_key('a*', s3_bucket).key == 'a/b'179        assert hook.get_wildcard_key('a*', s3_bucket, delimiter='/').key == 'abc'180        assert hook.get_wildcard_key('abc', s3_bucket, delimiter='/').key == 'abc'181        assert hook.get_wildcard_key(f's3://{s3_bucket}/a*').key == 'a/b'182        assert hook.get_wildcard_key(f's3://{s3_bucket}/a*', delimiter='/').key == 'abc'183        assert hook.get_wildcard_key(f's3://{s3_bucket}/abc', delimiter='/').key == 'abc'184        assert hook.get_wildcard_key('a', s3_bucket) is None185        assert hook.get_wildcard_key('b', s3_bucket) is None186        assert hook.get_wildcard_key(f's3://{s3_bucket}/a') is None187        assert hook.get_wildcard_key(f's3://{s3_bucket}/b') is None188    def test_load_string(self, s3_bucket):189        hook = S3Hook()190        hook.load_string("Contént", "my_key", s3_bucket)191        resource = boto3.resource('s3').Object(s3_bucket, 'my_key')  # pylint: disable=no-member192        assert resource.get()['Body'].read() == b'Cont\xC3\xA9nt'193    def test_load_string_acl(self, s3_bucket):194        hook = S3Hook()195        hook.load_string("Contént", "my_key", s3_bucket, acl_policy='public-read')196        response = boto3.client('s3').get_object_acl(Bucket=s3_bucket, Key="my_key", RequestPayer='requester')197        assert (response['Grants'][1]['Permission'] == 'READ') and (198            response['Grants'][0]['Permission'] == 'FULL_CONTROL'199        )200    def test_load_bytes(self, s3_bucket):201        hook = S3Hook()202        hook.load_bytes(b"Content", "my_key", s3_bucket)203        resource = boto3.resource('s3').Object(s3_bucket, 'my_key')  # pylint: disable=no-member204        assert resource.get()['Body'].read() == b'Content'205    def test_load_bytes_acl(self, s3_bucket):206        hook = S3Hook()207        hook.load_bytes(b"Content", "my_key", s3_bucket, acl_policy='public-read')208        response = boto3.client('s3').get_object_acl(Bucket=s3_bucket, Key="my_key", RequestPayer='requester')209        assert (response['Grants'][1]['Permission'] == 'READ') and (210            response['Grants'][0]['Permission'] == 'FULL_CONTROL'211        )212    def test_load_fileobj(self, s3_bucket):213        hook = S3Hook()214        with tempfile.TemporaryFile() as temp_file:215            temp_file.write(b"Content")216            temp_file.seek(0)217            hook.load_file_obj(temp_file, "my_key", s3_bucket)218            resource = boto3.resource('s3').Object(s3_bucket, 'my_key')  # pylint: disable=no-member219            assert resource.get()['Body'].read() == b'Content'220    def test_load_fileobj_acl(self, s3_bucket):221        hook = S3Hook()222        with tempfile.TemporaryFile() as temp_file:223            temp_file.write(b"Content")224            temp_file.seek(0)225            hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')226            response = boto3.client('s3').get_object_acl(227                Bucket=s3_bucket, Key="my_key", RequestPayer='requester'228            )  # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301229            assert (response['Grants'][1]['Permission'] == 'READ') and (230                response['Grants'][0]['Permission'] == 'FULL_CONTROL'231            )232    def test_load_file_gzip(self, s3_bucket):233        hook = S3Hook()234        with tempfile.NamedTemporaryFile(delete=False) as temp_file:235            temp_file.write(b"Content")236            temp_file.seek(0)237            hook.load_file(temp_file.name, "my_key", s3_bucket, gzip=True)238            resource = boto3.resource('s3').Object(s3_bucket, 'my_key')  # pylint: disable=no-member239            assert gz.decompress(resource.get()['Body'].read()) == b'Content'240            os.unlink(temp_file.name)241    def test_load_file_acl(self, s3_bucket):242        hook = S3Hook()243        with tempfile.NamedTemporaryFile(delete=False) as temp_file:244            temp_file.write(b"Content")245            temp_file.seek(0)246            hook.load_file(temp_file.name, "my_key", s3_bucket, gzip=True, acl_policy='public-read')247            response = boto3.client('s3').get_object_acl(248                Bucket=s3_bucket, Key="my_key", RequestPayer='requester'249            )  # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301250            assert (response['Grants'][1]['Permission'] == 'READ') and (251                response['Grants'][0]['Permission'] == 'FULL_CONTROL'252            )253            os.unlink(temp_file.name)254    def test_copy_object_acl(self, s3_bucket):255        hook = S3Hook()256        with tempfile.NamedTemporaryFile() as temp_file:257            temp_file.write(b"Content")258            temp_file.seek(0)259            hook.load_file_obj(temp_file, "my_key", s3_bucket)260            hook.copy_object("my_key", "my_key", s3_bucket, s3_bucket)261            response = boto3.client('s3').get_object_acl(262                Bucket=s3_bucket, Key="my_key", RequestPayer='requester'263            )  # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301264            assert (response['Grants'][0]['Permission'] == 'FULL_CONTROL') and (len(response['Grants']) == 1)265    @mock_s3266    def test_delete_bucket_if_bucket_exist(self, s3_bucket):267        # assert if the bucket is created268        mock_hook = S3Hook()269        mock_hook.create_bucket(bucket_name=s3_bucket)270        assert mock_hook.check_for_bucket(bucket_name=s3_bucket)271        mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)272        assert not mock_hook.check_for_bucket(s3_bucket)273    @mock_s3274    def test_delete_bucket_if_not_bucket_exist(self, s3_bucket):275        # assert if exception is raised if bucket not present276        mock_hook = S3Hook()277        with pytest.raises(ClientError) as error:278            # assert error279            assert mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)280        assert error.value.response['Error']['Code'] == 'NoSuchBucket'281    @mock.patch.object(S3Hook, 'get_connection', return_value=Connection(schema='test_bucket'))282    def test_provide_bucket_name(self, mock_get_connection):283        class FakeS3Hook(S3Hook):284            @provide_bucket_name285            def test_function(self, bucket_name=None):286                return bucket_name287        fake_s3_hook = FakeS3Hook()288        test_bucket_name = fake_s3_hook.test_function()289        assert test_bucket_name == mock_get_connection.return_value.schema290        test_bucket_name = fake_s3_hook.test_function(bucket_name='bucket')291        assert test_bucket_name == 'bucket'292    def test_delete_objects_key_does_not_exist(self, s3_bucket):293        hook = S3Hook()294        with pytest.raises(AirflowException) as err:295            hook.delete_objects(bucket=s3_bucket, keys=['key-1'])296        assert isinstance(err.value, AirflowException)297        assert str(err.value) == "Errors when deleting: ['key-1']"298    def test_delete_objects_one_key(self, mocked_s3_res, s3_bucket):299        key = 'key-1'300        mocked_s3_res.Object(s3_bucket, key).put(Body=b'Data')301        hook = S3Hook()302        hook.delete_objects(bucket=s3_bucket, keys=[key])303        assert [o.key for o in mocked_s3_res.Bucket(s3_bucket).objects.all()] == []304    def test_delete_objects_many_keys(self, mocked_s3_res, s3_bucket):305        num_keys_to_remove = 1001306        keys = []307        for index in range(num_keys_to_remove):308            key = f'key-{index}'309            mocked_s3_res.Object(s3_bucket, key).put(Body=b'Data')310            keys.append(key)311        assert sum(1 for _ in mocked_s3_res.Bucket(s3_bucket).objects.all()) == num_keys_to_remove312        hook = S3Hook()313        hook.delete_objects(bucket=s3_bucket, keys=keys)314        assert [o.key for o in mocked_s3_res.Bucket(s3_bucket).objects.all()] == []315    def test_unify_bucket_name_and_key(self):316        class FakeS3Hook(S3Hook):317            @unify_bucket_name_and_key318            def test_function_with_wildcard_key(self, wildcard_key, bucket_name=None):319                return bucket_name, wildcard_key320            @unify_bucket_name_and_key321            def test_function_with_key(self, key, bucket_name=None):322                return bucket_name, key323            @unify_bucket_name_and_key324            def test_function_with_test_key(self, test_key, bucket_name=None):325                return bucket_name, test_key326        fake_s3_hook = FakeS3Hook()327        test_bucket_name_with_wildcard_key = fake_s3_hook.test_function_with_wildcard_key('s3://foo/bar*.csv')328        assert ('foo', 'bar*.csv') == test_bucket_name_with_wildcard_key329        test_bucket_name_with_key = fake_s3_hook.test_function_with_key('s3://foo/bar.csv')330        assert ('foo', 'bar.csv') == test_bucket_name_with_key331        with pytest.raises(ValueError) as err:332            fake_s3_hook.test_function_with_test_key('s3://foo/bar.csv')333        assert isinstance(err.value, ValueError)334    @mock.patch('airflow.providers.amazon.aws.hooks.s3.NamedTemporaryFile')335    def test_download_file(self, mock_temp_file):336        mock_temp_file.return_value.__enter__ = Mock(return_value=mock_temp_file)337        s3_hook = S3Hook(aws_conn_id='s3_test')338        s3_hook.check_for_key = Mock(return_value=True)339        s3_obj = Mock()340        s3_obj.download_fileobj = Mock(return_value=None)341        s3_hook.get_key = Mock(return_value=s3_obj)342        key = 'test_key'343        bucket = 'test_bucket'344        s3_hook.download_file(key=key, bucket_name=bucket)345        s3_hook.check_for_key.assert_called_once_with(key, bucket)346        s3_hook.get_key.assert_called_once_with(key, bucket)347        s3_obj.download_fileobj.assert_called_once_with(mock_temp_file)348    def test_generate_presigned_url(self, s3_bucket):349        hook = S3Hook()350        presigned_url = hook.generate_presigned_url(351            client_method="get_object", params={'Bucket': s3_bucket, 'Key': "my_key"}352        )353        url = presigned_url.split("?")[1]354        params = {x[0]: x[1] for x in [x.split("=") for x in url[0:].split("&")]}355        assert {"AWSAccessKeyId", "Signature", "Expires"}.issubset(set(params.keys()))356    def test_should_throw_error_if_extra_args_is_not_dict(self):357        with pytest.raises(ValueError):358            S3Hook(extra_args=1)359    def test_should_throw_error_if_extra_args_contains_unknown_arg(self, s3_bucket):360        hook = S3Hook(extra_args={"unknown_s3_args": "value"})361        with tempfile.TemporaryFile() as temp_file:362            temp_file.write(b"Content")363            temp_file.seek(0)364            with pytest.raises(ValueError):365                hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')366    def test_should_pass_extra_args(self, s3_bucket):367        hook = S3Hook(extra_args={"ContentLanguage": "value"})368        with tempfile.TemporaryFile() as temp_file:369            temp_file.write(b"Content")370            temp_file.seek(0)371            hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')372            resource = boto3.resource('s3').Object(s3_bucket, 'my_key')  # pylint: disable=no-member...common.py
Source:common.py  
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3from io import BufferedReader, BufferedWriter4import boto35import paramiko6from boto3.exceptions import S3UploadFailedError7from botocore.config import Config8from botocore.exceptions import NoRegionError, ClientError9from dataeng.utils.data_type import represent_int10def get_s3_client(logger, session, config=None):11    """12    Gets an instance of S3 client.13    :param logger: logger14    :param session: AWS session15    :param config: Boto config16    :type logger: logging.Logger17    :type session: boto3.session.Session18    :type: config: botocore.config.Config19    :returns: S3 client20    :rtype: botocore.client.S321    """22    if not config:23        config = Config(retries=dict(max_attempts=10))24    try:25        logger.debug("Getting S3 client.")26        return session.client("s3", config=config)27    except NoRegionError as e:28        logger.error("Unable to get S3 client - {error}".format(error=e.__str__()))29def get_s3_resource(logger, session, config=None):30    """31    Gets an instance of S3 resource.32    :param logger: logger33    :param session: AWS session34    :param config: Boto config35    :type logger: logging.Logger36    :type session: boto3.session.Session37    :type: config: botocore.config.Config38    :returns: S3 resource39    :rtype: boto3.resources.factory.s3.ServiceResource40    """41    if not config:42        config = Config(retries=dict(max_attempts=10))43    try:44        logger.debug("Getting S3 resource.")45        return session.resource("s3", config=config)46    except NoRegionError as e:47        logger.error("Unable to get S3 resource - {error}".format(error=e.__str__()))48def is_s3_key_exists(logger, s3_resource, s3_bucket, s3_key):49    """50    Checks the existence of S3 key.51    :param logger: logger52    :param s3_resource: S3 resource53    :param s3_bucket: S3 bucket54    :param s3_key: S3 key55    :type logger: logging.Logger56    :type s3_resource: boto3.resources.factory.s3.ServiceResource57    :type s3_bucket: str58    :type s3_key: str59    :returns: True if key exists, False if key does not exist, None if unable to determine60    :rtype: bool61    """62    try:63        logger.debug("Checking the existence of s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))64        objs = list(s3_resource.Bucket(s3_bucket).objects.filter(Prefix=s3_key))65        return len(objs) > 0 and objs[0].key == s3_key66    except ClientError as e:67        if represent_int(e.response["Error"]["Code"]):68            error_code = int(e.response["Error"]["Code"])69            if error_code == 403:70                logger.debug("Bucket {s3_bucket} is forbidden".format(s3_bucket=s3_bucket))71                return False72        logger.warning("Unable to determine S3 key {s3_key} - {error}".format(s3_key=s3_key, error=e.__str__()))73def is_s3_bucket_exists(logger, s3_resource, s3_bucket):74    """75    Checks the existence of S3 bucket.76    :param logger: logger77    :param s3_resource: S3 resource78    :param s3_bucket: S3 bucket79    :type logger: logging.Logger80    :type s3_resource: boto3.resources.factory.s3.ServiceResource81    :type s3_bucket: str82    :returns: True if bucket exists, False if bucket does not exist or forbidden, None if unable to determine83    :rtype: bool84    """85    try:86        logger.debug("Checking the existence of s3://{s3_bucket}".format(s3_bucket=s3_bucket))87        s3_resource.meta.client.head_bucket(Bucket=s3_bucket)88        return True89    except ClientError as e:90        if represent_int(e.response["Error"]["Code"]):91            error_code = int(e.response["Error"]["Code"])92            if error_code == 403:93                logger.debug("Bucket {s3_bucket} is forbidden".format(s3_bucket=s3_bucket))94                return False95            elif error_code == 404:96                logger.debug("Bucket {s3_bucket} does not exist".format(s3_bucket=s3_bucket))97                return False98        logger.warning(99            "Unable to determine S3 bucket {s3_bucket} - {error}".format(s3_bucket=s3_bucket, error=e.__str__()))100def upload_s3_object(logger, s3_resource, f, s3_bucket, s3_key, callback=None):101    """102    Uploads file object to S3.103    :param logger: logger104    :param s3_resource: S3 resource105    :param f: file path or file object106    :param s3_bucket: S3 bucket107    :param s3_key: S3 key108    :param callback: callback for monitoring progress109    :type logger: logging.Logger110    :type s3_resource: boto3.resources.factory.s3.ServiceResource111    :type f: str | io.BufferedReader | paramiko.SFTPFile112    :type s3_bucket: str 113    :type s3_key: str114    :type callback: typing.Callables115    """116    try:117        if isinstance(f, str):118            logger.debug(119                "Uploading {f} to s3://{s3_bucket}/{s3_key}".format(f=f, s3_bucket=s3_bucket, s3_key=s3_key))120            s3_resource.meta.client.upload_file(f, s3_bucket, s3_key, Callback=callback)121        elif isinstance(f, BufferedReader) or isinstance(f, paramiko.SFTPFile):122            logger.debug(123                "Uploading a file object to s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))124            s3_resource.meta.client.upload_fileobj(f, s3_bucket, s3_key, Callback=callback)125        else:126            logger.error("Invalid input type for upload - {input_type}".format(input_type=type(f).__name__))127    except S3UploadFailedError as e:128        logger.warning(129            "Unable to upload file to s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket, s3_key=s3_key,130                                                                                  error=e.__str__()))131def download_s3_object(logger, s3_resource, f, s3_bucket, s3_key, callback=None):132    """133    Downloads file object from S3.134    :param logger: logger135    :param s3_resource: S3 resource136    :param f: file path or file object137    :param s3_bucket: S3 bucket138    :param s3_key: S3 key139    :param callback: callback for monitoring progress140    :type logger: logging.Logger141    :type s3_resource: boto3.resources.factory.s3.ServiceResource142    :type f: str | io.BufferedWriter143    :type s3_bucket: str 144    :type s3_key: str145    :type callback: typing.Callables146    """147    try:148        if isinstance(f, str):149            logger.debug("Downloading s3://{s3_bucket}/{s3_key} to {f}".format(s3_bucket=s3_bucket, s3_key=s3_key, f=f))150            s3_resource.meta.client.download_file(s3_bucket, s3_key, f, Callback=callback)151        elif isinstance(f, BufferedWriter):152            logger.debug(153                "Downloading s3://{s3_bucket}/{s3_key} to a file object".format(s3_bucket=s3_bucket, s3_key=s3_key))154            s3_resource.meta.client.download_fileobj(s3_bucket, s3_key, f, Callback=callback)155        else:156            logger.error("Invalid input type for download - {input_type}".format(input_type=type(f).__name__))157    except ClientError as e:158        if represent_int(e.response["Error"]["Code"]):159            error_code = int(e.response["Error"]["Code"])160            if error_code == 403:161                logger.debug(162                    "S3 object s3://{s3_bucket}/{s3_key} is forbidden".format(s3_bucket=s3_bucket, s3_key=s3_key))163                return164            elif error_code == 404:165                logger.debug(166                    "S3 object s3://{s3_bucket}/{s3_key} does not exist".format(s3_bucket=s3_bucket, s3_key=s3_key))167                return168        logger.warning(169            "Unable to download s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket, s3_key=s3_key,170                                                                            error=e.__str__()))171def delete_s3_object(logger, s3_resource, s3_bucket, s3_key):172    """173    Deletes an S3 object.174    :param logger: logger175    :param s3_resource: S3 resource176    :param s3_bucket: S3 bucket177    :param s3_key: S3 key178    :type logger: logging.Logger179    :type s3_resource: boto3.resources.factory.s3.ServiceResource180    :type s3_bucket: str181    :type s3_key: str182    """183    try:184        logger.debug("Deleting s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))185        s3_resource.Object(s3_bucket, s3_key).delete()186    except ClientError as e:187        logger.warning("Unable to delete S3 key {s3_key} - {error}".format(s3_key=s3_key, error=e.__str__()))188def delete_s3_prefix(logger, s3_resource, s3_bucket, s3_prefix):189    """190    Deletes an S3 prefix.191    :param logger: logger192    :param s3_resource: S3 resource193    :param s3_bucket: S3 bucket194    :param s3_prefix: S3 prefix195    :type logger: logging.Logger196    :type s3_resource: boto3.resources.factory.s3.ServiceResource197    :type s3_bucket: str198    :type s3_prefix: str199    """200    try:201        logger.debug("Deleting s3://{s3_bucket}/{s3_prefix}/".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))202        s3_resource.Bucket(s3_bucket).objects.filter(Prefix=s3_prefix).delete()203        s3_resource.Object(bucket_name=s3_bucket, key=s3_prefix).delete()204    except ClientError as e:205        if represent_int(e.response["Error"]["Code"]):206            error_code = int(e.response["Error"]["Code"])207            if error_code == 403:208                logger.debug(209                    "s3://{s3_bucket}/{s3_prefix}/ is forbidden".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))210                return211            elif error_code == 404:212                logger.debug(213                    "s3://{s3_bucket}/{s3_prefix}/ does not exist".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))214                return215        logger.warning("Unable to delete S3 prefix s3://{s3_bucket}/{s3_prefix} - {error}".format(s3_bucket=s3_bucket,216                                                                                                  s3_prefix=s3_prefix,217                                                                                                  error=e.__str__()))218def list_s3_keys(logger, s3_resource, s3_bucket, s3_prefix="", search="Contents"):219    """220    Lists S3 keys for a given S3 bucket and S3 prefix.221    :param logger: logger222    :param s3_resource: S3 resource223    :param s3_bucket: S3 bucket224    :param s3_prefix: S3 prefix225    :param search: JMESPath search string226    :type logger: logging.Logger227    :type s3_resource: boto3.resources.factory.s3.ServiceResource228    :type s3_bucket: str229    :type s3_prefix: str230    :type search: str231    :returns: list of S3 keys232    :rtype: list233    """234    try:235        if s3_prefix and s3_prefix[0] == "/":236            s3_prefix = s3_prefix[1:]237        if s3_prefix and s3_prefix[len(s3_prefix) - 1] != "/":238            s3_prefix = "{s3_prefix}/".format(s3_prefix=s3_prefix)239        paginator = s3_resource.meta.client.get_paginator("list_objects_v2")240        page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix)241        if search is not None:242            page_iterator = page_iterator.search(search)243        s3_keys = []244        for key_data in page_iterator:245            if key_data is not None:246                if not key_data["Key"].endswith("/"):247                    s3_keys.append(key_data["Key"])248        return s3_keys249    except ClientError as e:250        if represent_int(e.response["Error"]["Code"]):251            error_code = int(e.response["Error"]["Code"])252            if error_code == 403:253                logger.debug(254                    "s3://{s3_bucket}/{s3_prefix} is forbidden".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))255                return256            elif error_code == 404:257                logger.debug(258                    "s3://{s3_bucket}/{s3_prefix} does not exist".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))259                return260        logger.warning("Unable to list S3 keys from s3://{s3_bucket}/{s3_prefix} - {error}".format(s3_bucket=s3_bucket,261                                                                                                   s3_prefix=s3_prefix,262                                                                                                   error=e.__str__()))263def get_s3_object_size(logger, s3_resource, s3_bucket, s3_key):264    """265    Gets S3 object size.266    :param logger: logger267    :param s3_resource: S3 resource268    :param s3_bucket: S3 bucket269    :param s3_prefix: S3 prefix270    :param s3_keys: list of S3 keys271    :type logger: logging.Logger272    :type s3_resource: boto3.resources.factory.s3.ServiceResource273    :type s3_bucket: str274    :type s3_prefix: str275    :type s3_keys: list276    :returns: size of S3 object in bytes277    :rtype: int278    """279    try:280        return s3_resource.meta.client.head_object(Bucket=s3_bucket, Key=s3_key)["ContentLength"]281    except ClientError as e:282        if represent_int(e.response["Error"]["Code"]):283            error_code = int(e.response["Error"]["Code"])284            if error_code == 403:285                logger.debug("s3://{s3_bucket}/{s3_key} is forbidden".format(s3_bucket=s3_bucket, s3_key=s3_key))286                return287            elif error_code == 404:288                logger.debug("s3://{s3_bucket}/{s3_key} does not exist".format(s3_bucket=s3_bucket, s3_key=s3_key))289                return290        logger.warning("Unable to determine get size of s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket,291                                                                                                    s3_key=s3_key,...table_quality_check.py
Source:table_quality_check.py  
1import boto32import configparser3import os4import pyspark.sql.functions as F5from pyspark.sql import types as T6from pyspark.sql import SparkSession7from pyspark.sql.functions import udf, col8config = configparser.ConfigParser()9config.read('/home/workspace/dwh.cfg')10os.environ["AWS_ACCESS_KEY_ID"] = config.get("AWS_CREDENTIALS", "AWS_ACCESS_KEY_ID")11os.environ["AWS_SECRET_ACCESS_KEY"] = config.get("AWS_CREDENTIALS", "AWS_SECRET_ACCESS_KEY")12os.environ["s3_bucket"] = config.get("S3", "s3_bucket")13def check(path, table,spark):14    15    print ("======================================")16    checkvar=path + table17    print("Check Activated : " , checkvar)18    19    temp_table = spark.read.parquet(checkvar)20    temp_table.createOrReplaceTempView("temp_table")21    22    temp_table = spark.sql(" SELECT count(*) count FROM temp_table").first()23    print(table ," count :",temp_table[0])24    if (temp_table[0] > 0):25            print ("PASSED")26    else:27            print ("FAILED")28    29    print ("======================================")30    print ("")31    32def create_spark_session():33    """34       Create spark session for processing35    """36    print("Create Spark Session")37    spark = SparkSession \38        .builder \39        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \40        .getOrCreate()41    return spark42def main():43    """44        Main Function to load data to S3 using spark.45    """        46    #Print S3 bucket location47    s3_bucket=os.environ["s3_bucket"]48    s3_bucket = s3_bucket.replace("'", "")49 50    print (s3_bucket)51    #Invoke Functions to check data  52    check(s3_bucket + "datalake/", "country_table",spark)53    check(s3_bucket + "datalake/", "airport_table",spark)54    check(s3_bucket + "datalake/", "immigration_table",spark)55    check(s3_bucket + "datalake/", "immigrant_table",spark)56    check(s3_bucket + "datalake/", "weather_table",spark)57    check(s3_bucket + "datalake/", "city_state_table",spark)58    check(s3_bucket + "datalake/", "city_weather_table",spark)59    check(s3_bucket + "datalake/", "demographics_city_table",spark)60    check(s3_bucket + "datalake/", "immigration_demographic_table",spark)61    check(s3_bucket + "datalake/", "airports_weather_table",spark)62if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
