Best Python code snippet using localstack_python
test_s3fs.py
Source:test_s3fs.py  
1# -*- coding: utf-8 -*-2import datetime3from contextlib import contextmanager4import errno5import json6from concurrent.futures import ProcessPoolExecutor7import io8import time9import sys10import pytest11from itertools import chain12import fsspec.core13from s3fs.core import S3FileSystem14from s3fs.utils import ignoring, SSEParams15import moto16import botocore17from unittest import mock18from botocore.exceptions import NoCredentialsError19test_bucket_name = 'test'20secure_bucket_name = 'test-secure'21versioned_bucket_name = 'test-versioned'22files = {'test/accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n'23                                  b'{"amount": 200, "name": "Bob"}\n'24                                  b'{"amount": 300, "name": "Charlie"}\n'25                                  b'{"amount": 400, "name": "Dennis"}\n'),26         'test/accounts.2.json': (b'{"amount": 500, "name": "Alice"}\n'27                                  b'{"amount": 600, "name": "Bob"}\n'28                                  b'{"amount": 700, "name": "Charlie"}\n'29                                  b'{"amount": 800, "name": "Dennis"}\n')}30csv_files = {'2014-01-01.csv': (b'name,amount,id\n'31                                b'Alice,100,1\n'32                                b'Bob,200,2\n'33                                b'Charlie,300,3\n'),34             '2014-01-02.csv': (b'name,amount,id\n'),35             '2014-01-03.csv': (b'name,amount,id\n'36                                b'Dennis,400,4\n'37                                b'Edith,500,5\n'38                                b'Frank,600,6\n')}39text_files = {'nested/file1': b'hello\n',40              'nested/file2': b'world',41              'nested/nested2/file1': b'hello\n',42              'nested/nested2/file2': b'world'}43glob_files = {'file.dat': b'',44              'filexdat': b''}45a = test_bucket_name + '/tmp/test/a'46b = test_bucket_name + '/tmp/test/b'47c = test_bucket_name + '/tmp/test/c'48d = test_bucket_name + '/tmp/test/d'49py35 = sys.version_info[:2] == (3, 5)50@pytest.yield_fixture51def s3():52    # writable local S3 system53    with moto.mock_s3():54        from botocore.session import Session55        session = Session()56        client = session.create_client('s3')57        client.create_bucket(Bucket=test_bucket_name, ACL='public-read')58        client.create_bucket(59            Bucket=versioned_bucket_name, ACL='public-read')60        client.put_bucket_versioning(61            Bucket=versioned_bucket_name,62            VersioningConfiguration={63                'Status': 'Enabled'64            }65        )66        # initialize secure bucket67        client.create_bucket(68            Bucket=secure_bucket_name, ACL='public-read')69        policy = json.dumps({70            "Version": "2012-10-17",71            "Id": "PutObjPolicy",72            "Statement": [73                {74                    "Sid": "DenyUnEncryptedObjectUploads",75                    "Effect": "Deny",76                    "Principal": "*",77                    "Action": "s3:PutObject",78                    "Resource": "arn:aws:s3:::{bucket_name}/*".format(79                        bucket_name=secure_bucket_name),80                    "Condition": {81                        "StringNotEquals": {82                            "s3:x-amz-server-side-encryption": "aws:kms"83                        }84                    }85                }86            ]87        })88        client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)89        for k in [a, b, c, d]:90            try:91                client.delete_object(Bucket=test_bucket_name, Key=k)92            except:93                pass94        for flist in [files, csv_files, text_files, glob_files]:95            for f, data in flist.items():96                client.put_object(Bucket=test_bucket_name, Key=f, Body=data)97        S3FileSystem.clear_instance_cache()98        s3 = S3FileSystem(anon=False)99        s3.invalidate_cache()100        yield s3101        for flist in [files, csv_files, text_files, glob_files]:102            for f, data in flist.items():103                try:104                    client.delete_object(105                        Bucket=test_bucket_name, Key=f, Body=data)106                    client.delete_object(107                        Bucket=secure_bucket_name, Key=f, Body=data)108                except:109                    pass110        for k in [a, b, c, d]:111            try:112                client.delete_object(Bucket=test_bucket_name, Key=k)113                client.delete_object(Bucket=secure_bucket_name, Key=k)114            except:115                pass116@contextmanager117def expect_errno(expected_errno):118    """Expect an OSError and validate its errno code."""119    with pytest.raises(OSError) as error:120        yield121    assert error.value.errno == expected_errno, 'OSError has wrong error code.'122def test_simple(s3):123    data = b'a' * (10 * 2 ** 20)124    with s3.open(a, 'wb') as f:125        f.write(data)126    with s3.open(a, 'rb') as f:127        out = f.read(len(data))128        assert len(data) == len(out)129        assert out == data130@pytest.mark.parametrize('default_cache_type', ['none', 'bytes', 'mmap'])131def test_default_cache_type(s3, default_cache_type):132    data = b'a' * (10 * 2 ** 20)133    s3 = S3FileSystem(anon=False, default_cache_type=default_cache_type)134    with s3.open(a, 'wb') as f:135        f.write(data)136    with s3.open(a, 'rb') as f:137        assert isinstance(f.cache, fsspec.core.caches[default_cache_type])138        out = f.read(len(data))139        assert len(data) == len(out)140        assert out == data141def test_ssl_off():142    s3 = S3FileSystem(use_ssl=False)143    assert s3.s3.meta.endpoint_url.startswith('http://')144def test_client_kwargs():145    s3 = S3FileSystem(client_kwargs={'endpoint_url': 'http://foo'})146    assert s3.s3.meta.endpoint_url.startswith('http://foo')147def test_config_kwargs():148    s3 = S3FileSystem(config_kwargs={'signature_version': 's3v4'})149    assert s3.connect(refresh=True).meta.config.signature_version == 's3v4'150def test_config_kwargs_class_attributes_default():151    s3 = S3FileSystem()152    assert s3.connect(refresh=True).meta.config.connect_timeout == 5153    assert s3.connect(refresh=True).meta.config.read_timeout == 15154def test_config_kwargs_class_attributes_override():155    s3 = S3FileSystem(156        config_kwargs={157            "connect_timeout": 60,158            "read_timeout": 120,159        }160    )161    assert s3.connect(refresh=True).meta.config.connect_timeout == 60162    assert s3.connect(refresh=True).meta.config.read_timeout == 120163def test_idempotent_connect(s3):164    con1 = s3.connect()165    con2 = s3.connect(refresh=False)166    con3 = s3.connect(refresh=True)167    assert con1 is con2168    assert con1 is not con3169def test_multiple_objects(s3):170    s3.connect()171    assert s3.ls('test')172    s32 = S3FileSystem(anon=False)173    assert s32.session174    assert s3.ls('test') == s32.ls('test')175def test_info(s3):176    s3.touch(a)177    s3.touch(b)178    info = s3.info(a)179    linfo = s3.ls(a, detail=True)[0]180    assert abs(info.pop('LastModified') - linfo.pop('LastModified')).seconds < 1181    info.pop('VersionId')182    assert info == linfo183    parent = a.rsplit('/', 1)[0]184    s3.invalidate_cache()  # remove full path from the cache185    s3.ls(parent)  # fill the cache with parent dir186    assert s3.info(a) == s3.dircache[parent][0]  # correct value187    assert id(s3.info(a)) == id(s3.dircache[parent][0])  # is object from cache188    new_parent = test_bucket_name + '/foo'189    s3.mkdir(new_parent)190    with pytest.raises(FileNotFoundError):191        s3.info(new_parent)192    s3.ls(new_parent)193    with pytest.raises(FileNotFoundError):194        s3.info(new_parent)195def test_info_cached(s3):196    path = test_bucket_name + '/tmp/'197    fqpath = 's3://' + path198    s3.touch(path + '/test')199    info = s3.info(fqpath)200    assert info == s3.info(fqpath)201    assert info == s3.info(path)202def test_checksum(s3):203    bucket = test_bucket_name204    d = "checksum"205    prefix = d+"/e"206    o1 = prefix + "1"207    o2 = prefix + "2"208    path1 = bucket + "/" + o1209    path2 = bucket + "/" + o2210    client=s3.s3211    # init client and files212    client.put_object(Bucket=bucket, Key=o1, Body="")213    client.put_object(Bucket=bucket, Key=o2, Body="")214    # change one file, using cache215    client.put_object(Bucket=bucket, Key=o1, Body="foo")216    checksum = s3.checksum(path1)217    s3.ls(path1) # force caching218    client.put_object(Bucket=bucket, Key=o1, Body="bar")219    # refresh == False => checksum doesn't change220    assert checksum == s3.checksum(path1)221    # change one file, without cache222    client.put_object(Bucket=bucket, Key=o1, Body="foo")223    checksum = s3.checksum(path1, refresh=True)224    s3.ls(path1) # force caching225    client.put_object(Bucket=bucket, Key=o1, Body="bar")226    # refresh == True => checksum changes227    assert checksum != s3.checksum(path1, refresh=True)228    # Test for nonexistent file229    client.put_object(Bucket=bucket, Key=o1, Body="bar")230    s3.ls(path1) # force caching231    client.delete_object(Bucket=bucket, Key=o1)232    with pytest.raises(FileNotFoundError):233        checksum = s3.checksum(o1, refresh=True)234        235test_xattr_sample_metadata = {'test_xattr': '1'}236def test_xattr(s3):237    bucket, key = (test_bucket_name, 'tmp/test/xattr')238    filename = bucket + '/' + key239    body = b'aaaa'240    public_read_acl = {'Permission': 'READ', 'Grantee': {241        'URI': 'http://acs.amazonaws.com/groups/global/AllUsers', 'Type': 'Group'}}242    s3.s3.put_object(Bucket=bucket, Key=key,243                     ACL='public-read',244                     Metadata=test_xattr_sample_metadata,245                     Body=body)246    # save etag for later247    etag = s3.info(filename)['ETag']248    assert public_read_acl in s3.s3.get_object_acl(249        Bucket=bucket, Key=key)['Grants']250    assert s3.getxattr(251        filename, 'test_xattr') == test_xattr_sample_metadata['test_xattr']252    assert s3.metadata(filename) == test_xattr_sample_metadata253    s3file = s3.open(filename)254    assert s3file.getxattr(255        'test_xattr') == test_xattr_sample_metadata['test_xattr']256    assert s3file.metadata() == test_xattr_sample_metadata257    s3file.setxattr(test_xattr='2')258    assert s3file.getxattr('test_xattr') == '2'259    s3file.setxattr(**{'test_xattr': None})260    assert s3file.metadata() == {}261    assert s3.cat(filename) == body262    # check that ACL and ETag are preserved after updating metadata263    assert public_read_acl in s3.s3.get_object_acl(264        Bucket=bucket, Key=key)['Grants']265    assert s3.info(filename)['ETag'] == etag266def test_xattr_setxattr_in_write_mode(s3):267    s3file = s3.open(a, 'wb')268    with pytest.raises(NotImplementedError):269        s3file.setxattr(test_xattr='1')270@pytest.mark.xfail()271def test_delegate(s3):272    out = s3.get_delegated_s3pars()273    assert out274    assert out['token']275    s32 = S3FileSystem(**out)276    assert not s32.anon277    assert out == s32.get_delegated_s3pars()278def test_not_delegate():279    s3 = S3FileSystem(anon=True)280    out = s3.get_delegated_s3pars()281    assert out == {'anon': True}282    s3 = S3FileSystem(anon=False)  # auto credentials283    out = s3.get_delegated_s3pars()284    assert out == {'anon': False}285def test_ls(s3):286    assert set(s3.ls('')) == {test_bucket_name,287                              secure_bucket_name, versioned_bucket_name}288    with pytest.raises(FileNotFoundError):289        s3.ls('nonexistent')290    fn = test_bucket_name + '/test/accounts.1.json'291    assert fn in s3.ls(test_bucket_name + '/test')292def test_pickle(s3):293    import pickle294    s32 = pickle.loads(pickle.dumps(s3))295    assert s3.ls('test') == s32.ls('test')296    s33 = pickle.loads(pickle.dumps(s32))297    assert s3.ls('test') == s33.ls('test')298def test_ls_touch(s3):299    assert not s3.exists(test_bucket_name + '/tmp/test')300    s3.touch(a)301    s3.touch(b)302    L = s3.ls(test_bucket_name + '/tmp/test', True)303    assert {d['Key'] for d in L} == {a, b}304    L = s3.ls(test_bucket_name + '/tmp/test', False)305    assert set(L) == {a, b}306@pytest.mark.parametrize('version_aware', [True, False])307def test_exists_versioned(s3, version_aware):308    """Test to ensure that a prefix exists when using a versioned bucket"""309    import uuid310    n = 3311    s3 = S3FileSystem(anon=False, version_aware=version_aware)312    segments = [versioned_bucket_name] + [str(uuid.uuid4()) for _ in range(n)]313    path = '/'.join(segments)314    for i in range(2, n+1):315        assert not s3.exists('/'.join(segments[:i]))316    s3.touch(path)317    for i in range(2, n+1):318        assert s3.exists('/'.join(segments[:i]))319def test_isfile(s3):320    assert not s3.isfile('')321    assert not s3.isfile('/')322    assert not s3.isfile(test_bucket_name)323    assert not s3.isfile(test_bucket_name + '/test')324    assert not s3.isfile(test_bucket_name + '/test/foo')325    assert s3.isfile(test_bucket_name + '/test/accounts.1.json')326    assert s3.isfile(test_bucket_name + '/test/accounts.2.json')327    assert not s3.isfile(a)328    s3.touch(a)329    assert s3.isfile(a)330    assert not s3.isfile(b)331    assert not s3.isfile(b + '/')332    s3.mkdir(b)333    assert not s3.isfile(b)334    assert not s3.isfile(b + '/')335    assert not s3.isfile(c)336    assert not s3.isfile(c + '/')337    s3.mkdir(c + '/')338    assert not s3.isfile(c)339    assert not s3.isfile(c + '/')340def test_isdir(s3):341    assert s3.isdir('')342    assert s3.isdir('/')343    assert s3.isdir(test_bucket_name)344    assert s3.isdir(test_bucket_name + '/test')345    assert not s3.isdir(test_bucket_name + '/test/foo')346    assert not s3.isdir(test_bucket_name + '/test/accounts.1.json')347    assert not s3.isdir(test_bucket_name + '/test/accounts.2.json')348    assert not s3.isdir(a)349    s3.touch(a)350    assert not s3.isdir(a)351    assert not s3.isdir(b)352    assert not s3.isdir(b + '/')353    assert not s3.isdir(c)354    assert not s3.isdir(c + '/')355    # test cache356    s3.invalidate_cache()357    assert not s3.dircache358    s3.ls(test_bucket_name + '/nested')359    assert test_bucket_name + '/nested' in s3.dircache360    assert not s3.isdir(test_bucket_name + '/nested/file1')361    assert not s3.isdir(test_bucket_name + '/nested/file2')362    assert s3.isdir(test_bucket_name + '/nested/nested2')363    assert s3.isdir(test_bucket_name + '/nested/nested2/')364def test_rm(s3):365    assert not s3.exists(a)366    s3.touch(a)367    assert s3.exists(a)368    s3.rm(a)369    assert not s3.exists(a)370    with pytest.raises(FileNotFoundError):371        s3.rm(test_bucket_name + '/nonexistent')372    with pytest.raises(FileNotFoundError):373        s3.rm('nonexistent')374    s3.rm(test_bucket_name + '/nested', recursive=True)375    assert not s3.exists(test_bucket_name + '/nested/nested2/file1')376    # whole bucket377    s3.rm(test_bucket_name, recursive=True)378    assert not s3.exists(test_bucket_name + '/2014-01-01.csv')379    assert not s3.exists(test_bucket_name)380def test_rmdir(s3):381    bucket = 'test1_bucket'382    s3.mkdir(bucket)383    s3.rmdir(bucket)384    assert bucket not in s3.ls('/')385def test_mkdir(s3):386    bucket = 'test1_bucket'387    s3.mkdir(bucket)388    assert bucket in s3.ls('/')389def test_mkdir_region_name(s3):390    bucket = 'test1_bucket'391    s3.mkdir(bucket, region_name="eu-central-1")392    assert bucket in s3.ls('/')393def test_mkdir_client_region_name():394    bucket = 'test1_bucket'395    try:396        m = moto.mock_s3()397        m.start()398        s3 = S3FileSystem(anon=False, client_kwargs={"region_name":399                                                         "eu-central-1"})400        s3.mkdir(bucket)401        assert bucket in s3.ls('/')402    finally:403        m.stop()404def test_bulk_delete(s3):405    with pytest.raises(FileNotFoundError):406        s3.bulk_delete(['nonexistent/file'])407    with pytest.raises(ValueError):408        s3.bulk_delete(['bucket1/file', 'bucket2/file'])409    filelist = s3.find(test_bucket_name+'/nested')410    s3.bulk_delete(filelist)411    assert not s3.exists(test_bucket_name + '/nested/nested2/file1')412def test_anonymous_access():413    with ignoring(NoCredentialsError):414        s3 = S3FileSystem(anon=True)415        assert s3.ls('') == []416        # TODO: public bucket doesn't work through moto417    with pytest.raises(PermissionError):418        s3.mkdir('newbucket')419def test_s3_file_access(s3):420    fn = test_bucket_name + '/nested/file1'421    data = b'hello\n'422    assert s3.cat(fn) == data423    assert s3.head(fn, 3) == data[:3]424    assert s3.tail(fn, 3) == data[-3:]425    assert s3.tail(fn, 10000) == data426def test_s3_file_info(s3):427    fn = test_bucket_name + '/nested/file1'428    data = b'hello\n'429    assert fn in s3.find(test_bucket_name)430    assert s3.exists(fn)431    assert not s3.exists(fn + 'another')432    assert s3.info(fn)['Size'] == len(data)433    with pytest.raises(FileNotFoundError):434        s3.info(fn + 'another')435def test_bucket_exists(s3):436    assert s3.exists(test_bucket_name)437    assert not s3.exists(test_bucket_name + 'x')438    s3 = S3FileSystem(anon=True)439    assert s3.exists(test_bucket_name)440    assert not s3.exists(test_bucket_name + 'x')441def test_du(s3):442    d = s3.du(test_bucket_name, total=False)443    assert all(isinstance(v, int) and v >= 0 for v in d.values())444    assert test_bucket_name + '/nested/file1' in d445    assert s3.du(test_bucket_name + '/test/', total=True) == \446           sum(map(len, files.values()))447    assert s3.du(test_bucket_name) == s3.du('s3://' + test_bucket_name)448def test_s3_ls(s3):449    fn = test_bucket_name + '/nested/file1'450    assert fn not in s3.ls(test_bucket_name + '/')451    assert fn in s3.ls(test_bucket_name + '/nested/')452    assert fn in s3.ls(test_bucket_name + '/nested')453    assert s3.ls('s3://' + test_bucket_name +454                 '/nested/') == s3.ls(test_bucket_name + '/nested')455def test_s3_big_ls(s3):456    for x in range(1200):457        s3.touch(test_bucket_name + '/thousand/%i.part' % x)458    assert len(s3.find(test_bucket_name)) > 1200459    s3.rm(test_bucket_name + '/thousand/', recursive=True)460    assert len(s3.find(test_bucket_name + '/thousand/')) == 0461def test_s3_ls_detail(s3):462    L = s3.ls(test_bucket_name + '/nested', detail=True)463    assert all(isinstance(item, dict) for item in L)464def test_s3_glob(s3):465    fn = test_bucket_name + '/nested/file1'466    assert fn not in s3.glob(test_bucket_name + '/')467    assert fn not in s3.glob(test_bucket_name + '/*')468    assert fn not in s3.glob(test_bucket_name + '/nested')469    assert fn in s3.glob(test_bucket_name + '/nested/*')470    assert fn in s3.glob(test_bucket_name + '/nested/file*')471    assert fn in s3.glob(test_bucket_name + '/*/*')472    assert all(any(p.startswith(f + '/') or p == f473                   for p in s3.find(test_bucket_name))474               for f in s3.glob(test_bucket_name + '/nested/*'))475    assert [test_bucket_name +476            '/nested/nested2'] == s3.glob(test_bucket_name + '/nested/nested2')477    out = s3.glob(test_bucket_name + '/nested/nested2/*')478    assert {'test/nested/nested2/file1',479            'test/nested/nested2/file2'} == set(out)480    with pytest.raises(ValueError):481        s3.glob('*')482    # Make sure glob() deals with the dot character (.) correctly.483    assert test_bucket_name + '/file.dat' in s3.glob(test_bucket_name + '/file.*')484    assert test_bucket_name + \485           '/filexdat' not in s3.glob(test_bucket_name + '/file.*')486def test_get_list_of_summary_objects(s3):487    L = s3.ls(test_bucket_name + '/test')488    assert len(L) == 2489    assert [l.lstrip(test_bucket_name).lstrip('/')490            for l in sorted(L)] == sorted(list(files))491    L2 = s3.ls('s3://' + test_bucket_name + '/test')492    assert L == L2493def test_read_keys_from_bucket(s3):494    for k, data in files.items():495        file_contents = s3.cat('/'.join([test_bucket_name, k]))496        assert file_contents == data497        assert (s3.cat('/'.join([test_bucket_name, k])) ==498                s3.cat('s3://' + '/'.join([test_bucket_name, k])))499@pytest.mark.xfail(reason="misbehaves in modern versions of moto?")500def test_url(s3):501    fn = test_bucket_name + '/nested/file1'502    url = s3.url(fn, expires=100)503    assert 'http' in url504    import urllib.parse505    components = urllib.parse.urlparse(url)506    query = urllib.parse.parse_qs(components.query)507    exp = int(query['Expires'][0])508    delta = abs(exp - time.time() - 100)509    assert delta < 5510    with s3.open(fn) as f:511        assert 'http' in f.url()512def test_seek(s3):513    with s3.open(a, 'wb') as f:514        f.write(b'123')515    with s3.open(a) as f:516        f.seek(1000)517        with pytest.raises(ValueError):518            f.seek(-1)519        with pytest.raises(ValueError):520            f.seek(-5, 2)521        with pytest.raises(ValueError):522            f.seek(0, 10)523        f.seek(0)524        assert f.read(1) == b'1'525        f.seek(0)526        assert f.read(1) == b'1'527        f.seek(3)528        assert f.read(1) == b''529        f.seek(-1, 2)530        assert f.read(1) == b'3'531        f.seek(-1, 1)532        f.seek(-1, 1)533        assert f.read(1) == b'2'534        for i in range(4):535            assert f.seek(i) == i536def test_bad_open(s3):537    with pytest.raises(ValueError):538        s3.open('')539def test_copy(s3):540    fn = test_bucket_name + '/test/accounts.1.json'541    s3.copy(fn, fn + '2')542    assert s3.cat(fn) == s3.cat(fn + '2')543def test_copy_managed(s3):544    data = b'abc' * 12*2**20545    fn = test_bucket_name + '/test/biggerfile'546    with s3.open(fn, 'wb') as f:547        f.write(data)548    s3.copy_managed(fn, fn + '2', block=5 * 2 ** 20)549    assert s3.cat(fn) == s3.cat(fn + '2')550    with pytest.raises(ValueError):551        s3.copy_managed(fn, fn + '3', block=4 * 2 ** 20)552    with pytest.raises(ValueError):553        s3.copy_managed(fn, fn + '3', block=6 * 2 ** 30)554def test_move(s3):555    fn = test_bucket_name + '/test/accounts.1.json'556    data = s3.cat(fn)557    s3.mv(fn, fn + '2')558    assert s3.cat(fn + '2') == data559    assert not s3.exists(fn)560def test_get_put(s3, tmpdir):561    test_file = str(tmpdir.join('test.json'))562    s3.get(test_bucket_name + '/test/accounts.1.json', test_file)563    data = files['test/accounts.1.json']564    assert open(test_file, 'rb').read() == data565    s3.put(test_file, test_bucket_name + '/temp')566    assert s3.du(test_bucket_name +567                 '/temp', total=False)[test_bucket_name + '/temp'] == len(data)568    assert s3.cat(test_bucket_name + '/temp') == data569def test_errors(s3):570    with pytest.raises(FileNotFoundError):571        s3.open(test_bucket_name + '/tmp/test/shfoshf', 'rb')572    # This is fine, no need for interleaving directories on S3573    # with pytest.raises((IOError, OSError)):574    #    s3.touch('tmp/test/shfoshf/x')575    with pytest.raises(FileNotFoundError):576        s3.rm(test_bucket_name + '/tmp/test/shfoshf/x')577    with pytest.raises(FileNotFoundError):578        s3.mv(test_bucket_name + '/tmp/test/shfoshf/x', 'tmp/test/shfoshf/y')579    with pytest.raises(ValueError):580        s3.open('x', 'rb')581    with pytest.raises(FileNotFoundError):582        s3.rm('unknown')583    with pytest.raises(ValueError):584        with s3.open(test_bucket_name + '/temp', 'wb') as f:585            f.read()586    with pytest.raises(ValueError):587        f = s3.open(test_bucket_name + '/temp', 'rb')588        f.close()589        f.read()590    with pytest.raises(ValueError):591        s3.mkdir('/')592    with pytest.raises(ValueError):593        s3.find('')594    with pytest.raises(ValueError):595        s3.find('s3://')596def test_read_small(s3):597    fn = test_bucket_name + '/2014-01-01.csv'598    with s3.open(fn, 'rb', block_size=10) as f:599        out = []600        while True:601            data = f.read(3)602            if data == b'':603                break604            out.append(data)605        assert s3.cat(fn) == b''.join(out)606        # cache drop607        assert len(f.cache) < len(out)608def test_read_s3_block(s3):609    data = files['test/accounts.1.json']610    lines = io.BytesIO(data).readlines()611    path = test_bucket_name + '/test/accounts.1.json'612    assert s3.read_block(path, 1, 35, b'\n') == lines[1]613    assert s3.read_block(path, 0, 30, b'\n') == lines[0]614    assert s3.read_block(path, 0, 35, b'\n') == lines[0] + lines[1]615    assert s3.read_block(path, 0, 5000, b'\n') == data616    assert len(s3.read_block(path, 0, 5)) == 5617    assert len(s3.read_block(path, 4, 5000)) == len(data) - 4618    assert s3.read_block(path, 5000, 5010) == b''619    assert s3.read_block(path, 5, None) == s3.read_block(path, 5, 1000)620def test_new_bucket(s3):621    assert not s3.exists('new')622    s3.mkdir('new')623    assert s3.exists('new')624    with s3.open('new/temp', 'wb') as f:625        f.write(b'hello')626    with expect_errno(errno.ENOTEMPTY):627        s3.rmdir('new')628    s3.rm('new/temp')629    s3.rmdir('new')630    assert 'new' not in s3.ls('')631    assert not s3.exists('new')632    with pytest.raises(FileNotFoundError):633        s3.ls('new')634def test_dynamic_add_rm(s3):635    s3.mkdir('one')636    s3.mkdir('one/two')637    assert s3.exists('one')638    s3.ls('one')639    s3.touch("one/two/file_a")640    assert s3.exists('one/two/file_a')641    s3.rm('one', recursive=True)642    assert not s3.exists('one')643def test_write_small(s3):644    with s3.open(test_bucket_name + '/test', 'wb') as f:645        f.write(b'hello')646    assert s3.cat(test_bucket_name + '/test') == b'hello'647    s3.open(test_bucket_name + '/test', 'wb').close()648    assert s3.info(test_bucket_name + '/test')['Size'] == 0649def test_write_large(s3):650    "flush() chunks buffer when processing large singular payload"651    mb = 2 ** 20652    payload_size = int(2.5 * 5 * mb)653    payload = b'0' * payload_size654    with s3.open(test_bucket_name + '/test', 'wb') as fd, \655         mock.patch.object(s3, '_call_s3', side_effect=s3._call_s3) as s3_mock:656        fd.write(payload)657    upload_parts = s3_mock.mock_calls[1:]658    upload_sizes = [len(upload_part[2]['Body']) for upload_part in upload_parts]659    assert upload_sizes == [5 * mb, int(7.5 * mb)]660    assert s3.cat(test_bucket_name + '/test') == payload661    assert s3.info(test_bucket_name + '/test')['Size'] == payload_size662def test_write_limit(s3):663    "flush() respects part_max when processing large singular payload"664    mb = 2 ** 20665    block_size = 15 * mb666    part_max = 28 * mb667    payload_size = 44 * mb668    payload = b'0' * payload_size669    with s3.open(test_bucket_name + '/test', 'wb') as fd, \670         mock.patch('s3fs.core.S3File.part_max', new=part_max), \671         mock.patch.object(s3, '_call_s3', side_effect=s3._call_s3) as s3_mock:672        fd.blocksize = block_size673        fd.write(payload)674    upload_parts = s3_mock.mock_calls[1:]675    upload_sizes = [len(upload_part[2]['Body']) for upload_part in upload_parts]676    assert upload_sizes == [block_size, int(14.5 * mb), int(14.5 * mb)]677    assert s3.cat(test_bucket_name + '/test') == payload678    assert s3.info(test_bucket_name + '/test')['Size'] == payload_size679def test_write_small_secure(s3):680    # Unfortunately moto does not yet support enforcing SSE policies.  It also681    # does not return the correct objects that can be used to test the results682    # effectively.683    # This test is left as a placeholder in case moto eventually supports this.684    sse_params = SSEParams(server_side_encryption='aws:kms')685    with s3.open(secure_bucket_name + '/test', 'wb', writer_kwargs=sse_params) as f:686        f.write(b'hello')687    assert s3.cat(secure_bucket_name + '/test') == b'hello'688    head = s3.s3.head_object(Bucket=secure_bucket_name, Key='test')689def test_write_large_secure(s3):690    s3_mock = moto.mock_s3()691    s3_mock.start()692    # build our own s3fs with the relevant additional kwarg693    s3 = S3FileSystem(s3_additional_kwargs={'ServerSideEncryption': 'AES256'})694    s3.mkdir('mybucket')695    with s3.open('mybucket/myfile', 'wb') as f:696        f.write(b'hello hello' * 10 ** 6)697    assert s3.cat('mybucket/myfile') == b'hello hello' * 10 ** 6698def test_write_fails(s3):699    with pytest.raises(ValueError):700        s3.touch(test_bucket_name + '/temp')701        s3.open(test_bucket_name + '/temp', 'rb').write(b'hello')702    with pytest.raises(ValueError):703        s3.open(test_bucket_name + '/temp', 'wb', block_size=10)704    f = s3.open(test_bucket_name + '/temp', 'wb')705    f.close()706    with pytest.raises(ValueError):707        f.write(b'hello')708    with pytest.raises(FileNotFoundError):709        s3.open('nonexistentbucket/temp', 'wb').close()710def test_write_blocks(s3):711    with s3.open(test_bucket_name + '/temp', 'wb') as f:712        f.write(b'a' * 2 * 2 ** 20)713        assert f.buffer.tell() == 2 * 2 ** 20714        assert not (f.parts)715        f.flush()716        assert f.buffer.tell() == 2 * 2 ** 20717        assert not (f.parts)718        f.write(b'a' * 2 * 2 ** 20)719        f.write(b'a' * 2 * 2 ** 20)720        assert f.mpu721        assert f.parts722    assert s3.info(test_bucket_name + '/temp')['Size'] == 6 * 2 ** 20723    with s3.open(test_bucket_name + '/temp', 'wb', block_size=10 * 2 ** 20) as f:724        f.write(b'a' * 15 * 2 ** 20)725        assert f.buffer.tell() == 0726    assert s3.info(test_bucket_name + '/temp')['Size'] == 15 * 2 ** 20727def test_readline(s3):728    all_items = chain.from_iterable([729        files.items(), csv_files.items(), text_files.items()730    ])731    for k, data in all_items:732        with s3.open('/'.join([test_bucket_name, k]), 'rb') as f:733            result = f.readline()734            expected = data.split(b'\n')[0] + (b'\n' if data.count(b'\n')735                                               else b'')736            assert result == expected737def test_readline_empty(s3):738    data = b''739    with s3.open(a, 'wb') as f:740        f.write(data)741    with s3.open(a, 'rb') as f:742        result = f.readline()743        assert result == data744def test_readline_blocksize(s3):745    data = b'ab\n' + b'a' * (10 * 2 ** 20) + b'\nab'746    with s3.open(a, 'wb') as f:747        f.write(data)748    with s3.open(a, 'rb') as f:749        result = f.readline()750        expected = b'ab\n'751        assert result == expected752        result = f.readline()753        expected = b'a' * (10 * 2 ** 20) + b'\n'754        assert result == expected755        result = f.readline()756        expected = b'ab'757        assert result == expected758def test_next(s3):759    expected = csv_files['2014-01-01.csv'].split(b'\n')[0] + b'\n'760    with s3.open(test_bucket_name + '/2014-01-01.csv') as f:761        result = next(f)762        assert result == expected763def test_iterable(s3):764    data = b'abc\n123'765    with s3.open(a, 'wb') as f:766        f.write(data)767    with s3.open(a) as f, io.BytesIO(data) as g:768        for froms3, fromio in zip(f, g):769            assert froms3 == fromio770        f.seek(0)771        assert f.readline() == b'abc\n'772        assert f.readline() == b'123'773        f.seek(1)774        assert f.readline() == b'bc\n'775    with s3.open(a) as f:776        out = list(f)777    with s3.open(a) as f:778        out2 = f.readlines()779    assert out == out2780    assert b"".join(out) == data781def test_readable(s3):782    with s3.open(a, 'wb') as f:783        assert not f.readable()784    with s3.open(a, 'rb') as f:785        assert f.readable()786def test_seekable(s3):787    with s3.open(a, 'wb') as f:788        assert not f.seekable()789    with s3.open(a, 'rb') as f:790        assert f.seekable()791def test_writable(s3):792    with s3.open(a, 'wb') as f:793        assert f.writable()794    with s3.open(a, 'rb') as f:795        assert not f.writable()796def test_merge(s3):797    with s3.open(a, 'wb') as f:798        f.write(b'a' * 10 * 2 ** 20)799    with s3.open(b, 'wb') as f:800        f.write(b'a' * 10 * 2 ** 20)801    s3.merge(test_bucket_name + '/joined', [a, b])802    assert s3.info(test_bucket_name + '/joined')['Size'] == 2 * 10 * 2 ** 20803def test_append(s3):804    data = text_files['nested/file1']805    with s3.open(test_bucket_name + '/nested/file1', 'ab') as f:806        assert f.tell() == len(data)  # append, no write, small file807    assert s3.cat(test_bucket_name + '/nested/file1') == data808    with s3.open(test_bucket_name + '/nested/file1', 'ab') as f:809        f.write(b'extra')  # append, write, small file810    assert s3.cat(test_bucket_name + '/nested/file1') == data + b'extra'811    with s3.open(a, 'wb') as f:812        f.write(b'a' * 10 * 2 ** 20)813    with s3.open(a, 'ab') as f:814        pass  # append, no write, big file815    assert s3.cat(a) == b'a' * 10 * 2 ** 20816    with s3.open(a, 'ab') as f:817        assert f.parts is None818        f._initiate_upload()819        assert f.parts820        assert f.tell() == 10 * 2 ** 20821        f.write(b'extra')  # append, small write, big file822    assert s3.cat(a) == b'a' * 10 * 2 ** 20 + b'extra'823    with s3.open(a, 'ab') as f:824        assert f.tell() == 10 * 2 ** 20 + 5825        f.write(b'b' * 10 * 2 ** 20)  # append, big write, big file826        assert f.tell() == 20 * 2 ** 20 + 5827    assert s3.cat(a) == b'a' * 10 * 2 ** 20 + b'extra' + b'b' * 10 * 2 ** 20828def test_bigger_than_block_read(s3):829    with s3.open(test_bucket_name + '/2014-01-01.csv', 'rb', block_size=3) as f:830        out = []831        while True:832            data = f.read(20)833            out.append(data)834            if len(data) == 0:835                break836    assert b''.join(out) == csv_files['2014-01-01.csv']837def test_current(s3):838    s3._cache.clear()839    s3 = S3FileSystem()840    assert s3.current() is s3841    assert S3FileSystem.current() is s3842def test_array(s3):843    from array import array844    data = array('B', [65] * 1000)845    with s3.open(a, 'wb') as f:846        f.write(data)847    with s3.open(a, 'rb') as f:848        out = f.read()849        assert out == b'A' * 1000850def _get_s3_id(s3):851    return id(s3.s3)852def test_no_connection_sharing_among_processes(s3):853    executor = ProcessPoolExecutor()854    conn_id = executor.submit(_get_s3_id, s3).result()855    assert id(s3.connect()) != conn_id, \856        "Processes should not share S3 connections."857@pytest.mark.xfail()858def test_public_file(s3):859    # works on real s3, not on moto860    try:861        test_bucket_name = 's3fs_public_test'862        other_bucket_name = 's3fs_private_test'863        s3.touch(test_bucket_name)864        s3.touch(test_bucket_name + '/afile')865        s3.touch(other_bucket_name, acl='public-read')866        s3.touch(other_bucket_name + '/afile', acl='public-read')867        s = S3FileSystem(anon=True)868        with pytest.raises(PermissionError):869            s.ls(test_bucket_name)870        s.ls(other_bucket_name)871        s3.chmod(test_bucket_name, acl='public-read')872        s3.chmod(other_bucket_name, acl='private')873        with pytest.raises(PermissionError):874            s.ls(other_bucket_name, refresh=True)875        assert s.ls(test_bucket_name, refresh=True)876        # public file in private bucket877        with s3.open(other_bucket_name + '/see_me', 'wb', acl='public-read') as f:878            f.write(b'hello')879        assert s.cat(other_bucket_name + '/see_me') == b'hello'880    finally:881        s3.rm(test_bucket_name, recursive=True)882        s3.rm(other_bucket_name, recursive=True)883def test_upload_with_s3fs_prefix(s3):884    path = 's3://test/prefix/key'885    with s3.open(path, 'wb') as f:886        f.write(b'a' * (10 * 2 ** 20))887    with s3.open(path, 'ab') as f:888        f.write(b'b' * (10 * 2 ** 20))889def test_multipart_upload_blocksize(s3):890    blocksize = 5 * (2 ** 20)891    expected_parts = 3892    s3f = s3.open(a, 'wb', block_size=blocksize)893    for _ in range(3):894        data = b'b' * blocksize895        s3f.write(data)896    # Ensure that the multipart upload consists of only 3 parts897    assert len(s3f.parts) == expected_parts898    s3f.close()899def test_default_pars(s3):900    s3 = S3FileSystem(default_block_size=20, default_fill_cache=False)901    fn = test_bucket_name + '/' + list(files)[0]902    with s3.open(fn) as f:903        assert f.blocksize == 20904        assert f.fill_cache is False905    with s3.open(fn, block_size=40, fill_cache=True) as f:906        assert f.blocksize == 40907        assert f.fill_cache is True908def test_tags(s3):909    tagset = {'tag1': 'value1', 'tag2': 'value2'}910    fname = list(files)[0]911    s3.touch(fname)912    s3.put_tags(fname, tagset)913    assert s3.get_tags(fname) == tagset914    # Ensure merge mode updates value of existing key and adds new one915    new_tagset = {'tag2': 'updatedvalue2', 'tag3': 'value3'}916    s3.put_tags(fname, new_tagset, mode='m')917    tagset.update(new_tagset)918    assert s3.get_tags(fname) == tagset919@pytest.mark.skipif(py35, reason='no versions on old moto for py36')920def test_versions(s3):921    versioned_file = versioned_bucket_name + '/versioned_file'922    s3 = S3FileSystem(anon=False, version_aware=True)923    with s3.open(versioned_file, 'wb') as fo:924        fo.write(b'1')925    with s3.open(versioned_file, 'wb') as fo:926        fo.write(b'2')927    assert s3.isfile(versioned_file)928    versions = s3.object_version_info(versioned_file)929    version_ids = [version['VersionId'] for version in versions]930    assert len(version_ids) == 2931    with s3.open(versioned_file) as fo:932        assert fo.version_id == version_ids[1]933        assert fo.read() == b'2'934    with s3.open(versioned_file, version_id=version_ids[0]) as fo:935        assert fo.version_id == version_ids[0]936        assert fo.read() == b'1'937@pytest.mark.skipif(py35, reason='no versions on old moto for py36')938def test_list_versions_many(s3):939    # moto doesn't actually behave in the same way that s3 does here so this doesn't test940    # anything really in moto 1.2941    s3 = S3FileSystem(anon=False, version_aware=True)942    versioned_file = versioned_bucket_name + '/versioned_file2'943    for i in range(1200):944        with s3.open(versioned_file, 'wb') as fo:945            fo.write(b'1')946    versions = s3.object_version_info(versioned_file)947    assert len(versions) == 1200948def test_fsspec_versions_multiple(s3):949    """Test that the standard fsspec.core.get_fs_token_paths behaves as expected for versionId urls"""950    s3 = S3FileSystem(anon=False, version_aware=True)951    versioned_file = versioned_bucket_name + '/versioned_file3'952    version_lookup = {}953    for i in range(20):954        contents = str(i).encode()955        with s3.open(versioned_file, 'wb') as fo:956            fo.write(contents)957        version_lookup[fo.version_id] = contents958    urls = ["s3://{}?versionId={}".format(versioned_file, version)959            for version in version_lookup.keys()]960    fs, token, paths = fsspec.core.get_fs_token_paths(urls)961    assert isinstance(fs, S3FileSystem)962    assert fs.version_aware963    for path in paths:964        with fs.open(path, 'rb') as fo:965            contents = fo.read()966            assert contents == version_lookup[fo.version_id]967@pytest.mark.skipif(py35, reason='no versions on old moto for py36')968def test_versioned_file_fullpath(s3):969    versioned_file = versioned_bucket_name + '/versioned_file_fullpath'970    s3 = S3FileSystem(anon=False, version_aware=True)971    with s3.open(versioned_file, 'wb') as fo:972        fo.write(b'1')973    # moto doesn't correctly return a versionId for a multipart upload. So we resort to this.974    # version_id = fo.version_id975    versions = s3.object_version_info(versioned_file)976    version_ids = [version['VersionId'] for version in versions]977    version_id = version_ids[0]978    with s3.open(versioned_file, 'wb') as fo:979        fo.write(b'2')980    file_with_version = "{}?versionId={}".format(versioned_file, version_id)981    with s3.open(file_with_version, 'rb') as fo:982        assert fo.version_id == version_id983        assert fo.read() == b'1'984def test_versions_unaware(s3):985    versioned_file = versioned_bucket_name + '/versioned_file3'986    s3 = S3FileSystem(anon=False, version_aware=False)987    with s3.open(versioned_file, 'wb') as fo:988        fo.write(b'1')989    with s3.open(versioned_file, 'wb') as fo:990        fo.write(b'2')991    with s3.open(versioned_file) as fo:992        assert fo.version_id is None993        assert fo.read() == b'2'994    with pytest.raises(ValueError):995        with s3.open(versioned_file, version_id='0'):996            fo.read()997def test_text_io__stream_wrapper_works(s3):998    """Ensure using TextIOWrapper works."""999    s3.mkdir('bucket')1000    with s3.open('bucket/file.txt', 'wb') as fd:1001        fd.write(u'\u00af\\_(\u30c4)_/\u00af'.encode('utf-16-le'))1002    with s3.open('bucket/file.txt', 'rb') as fd:1003        with io.TextIOWrapper(fd, 'utf-16-le') as stream:1004            assert stream.readline() == u'\u00af\\_(\u30c4)_/\u00af'1005def test_text_io__basic(s3):1006    """Text mode is now allowed."""1007    s3.mkdir('bucket')1008    with s3.open('bucket/file.txt', 'w') as fd:1009        fd.write(u'\u00af\\_(\u30c4)_/\u00af')1010    with s3.open('bucket/file.txt', 'r') as fd:1011        assert fd.read() == u'\u00af\\_(\u30c4)_/\u00af'1012def test_text_io__override_encoding(s3):1013    """Allow overriding the default text encoding."""1014    s3.mkdir('bucket')1015    with s3.open('bucket/file.txt', 'w', encoding='ibm500') as fd:1016        fd.write(u'Hello, World!')1017    with s3.open('bucket/file.txt', 'r', encoding='ibm500') as fd:1018        assert fd.read() == u'Hello, World!'1019def test_readinto(s3):1020    s3.mkdir('bucket')1021    with s3.open('bucket/file.txt', 'wb') as fd:1022        fd.write(b'Hello, World!')1023    contents = bytearray(15)1024    with s3.open('bucket/file.txt', 'rb') as fd:1025        assert fd.readinto(contents) == 131026    assert contents.startswith(b'Hello, World!')1027def test_change_defaults_only_subsequent():1028    """Test for Issue #1351029    Ensure that changing the default block size doesn't affect existing file1030    systems that were created using that default. It should only affect file1031    systems created after the change.1032    """1033    try:1034        S3FileSystem.cachable = False  # don't reuse instances with same pars1035        fs_default = S3FileSystem()1036        assert fs_default.default_block_size == 5 * (1024 ** 2)1037        fs_overridden = S3FileSystem(default_block_size=64 * (1024 ** 2))1038        assert fs_overridden.default_block_size == 64 * (1024 ** 2)1039        # Suppose I want all subsequent file systems to have a block size of 1 GiB1040        # instead of 5 MiB:1041        S3FileSystem.default_block_size = 1024 ** 31042        fs_big = S3FileSystem()1043        assert fs_big.default_block_size == 1024 ** 31044        # Test the other file systems created to see if their block sizes changed1045        assert fs_overridden.default_block_size == 64 * (1024 ** 2)1046        assert fs_default.default_block_size == 5 * (1024 ** 2)1047    finally:1048        S3FileSystem.default_block_size = 5 * (1024 ** 2)1049        S3FileSystem.cachable = True1050def test_passed_in_session_set_correctly(s3):1051    session = botocore.session.Session()1052    s3 = S3FileSystem(session=session)1053    assert s3.passed_in_session is session1054    client = s3.connect()1055    assert s3.session is session1056def test_without_passed_in_session_set_unique(s3):1057    session = botocore.session.Session()1058    s3 = S3FileSystem()1059    assert s3.passed_in_session is None1060    client = s3.connect()1061    assert s3.session is not session1062def test_pickle_without_passed_in_session(s3):1063    import pickle1064    s3 = S3FileSystem()1065    pickle.dumps(s3)1066def test_pickle_with_passed_in_session(s3):1067    import pickle1068    session = botocore.session.Session()1069    s3 = S3FileSystem(session=session)1070    with pytest.raises((AttributeError, NotImplementedError, TypeError, pickle.PicklingError)):1071        pickle.dumps(s3)1072def test_cache_after_copy(s3):1073    # https://github.com/dask/dask/issues/51341074    s3.touch('test/afile')1075    assert 'test/afile' in s3.ls('s3://test', False)1076    s3.cp('test/afile', 'test/bfile')1077    assert 'test/bfile' in s3.ls('s3://test', False)1078def test_autocommit(s3):1079    auto_file = test_bucket_name + '/auto_file'1080    committed_file = test_bucket_name + '/commit_file'1081    aborted_file = test_bucket_name + '/aborted_file'1082    s3 = S3FileSystem(anon=False, version_aware=True)1083    def write_and_flush(path, autocommit):1084        with s3.open(path, 'wb', autocommit=autocommit) as fo:1085            fo.write(b'1')1086        return fo1087    # regular behavior1088    fo = write_and_flush(auto_file, autocommit=True)1089    assert fo.autocommit1090    assert s3.exists(auto_file)1091    fo = write_and_flush(committed_file, autocommit=False)1092    assert not fo.autocommit1093    assert not s3.exists(committed_file)1094    fo.commit()1095    assert s3.exists(committed_file)1096    fo = write_and_flush(aborted_file,autocommit=False)1097    assert not s3.exists(aborted_file)1098    fo.discard()1099    assert not s3.exists(aborted_file)1100    # Cannot commit a file that was discarded1101    with pytest.raises(Exception):1102        fo.commit()1103def test_autocommit_mpu(s3):1104    """When not autocommitting we always want to use multipart uploads"""1105    path = test_bucket_name + '/auto_commit_with_mpu'1106    with s3.open(path, 'wb', autocommit=False) as fo:1107        fo.write(b'1')1108    assert fo.mpu is not None1109    assert len(fo.parts) == 11110def test_touch(s3):1111    # create1112    fn = test_bucket_name + "/touched"1113    assert not s3.exists(fn)1114    s3.touch(fn)1115    assert s3.exists(fn)1116    assert s3.size(fn) == 01117    # truncates1118    with s3.open(fn, 'wb') as f:1119        f.write(b'data')1120    assert s3.size(fn) == 41121    s3.touch(fn, truncate=True)1122    assert s3.size(fn) == 01123    # exists error1124    with s3.open(fn, 'wb') as f:1125        f.write(b'data')1126    assert s3.size(fn) == 41127    with pytest.raises(ValueError):1128        s3.touch(fn, truncate=False)1129    assert s3.size(fn) == 41130def test_seek_reads(s3):1131    fn = test_bucket_name + "/myfile"1132    with s3.open(fn, 'wb') as f:1133        f.write(b'a' * 175627146)1134    with s3.open(fn, 'rb', blocksize=100) as f:1135        f.seek(175561610)1136        d1 = f.read(65536)1137        f.seek(4)1138        size = 175621981139        d2 = f.read(size)1140        assert len(d2) == size1141        f.seek(17562288)1142        size = 175621871143        d3 = f.read(size)1144        assert len(d3) == size1145def test_connect_many():1146    from multiprocessing.pool import ThreadPool1147    def task(i):1148        S3FileSystem(anon=False).ls("")1149        return True1150    pool = ThreadPool(processes=20)1151    out = pool.map(task, range(40))1152    assert all(out)1153    pool.close()1154    pool.join()1155def test_requester_pays():1156    fn = test_bucket_name + "/myfile"1157    with moto.mock_s3():1158        s3 = S3FileSystem(requester_pays=True)1159        assert s3.req_kw["RequestPayer"] == "requester"1160        s3.mkdir(test_bucket_name)1161        s3.touch(fn)1162        with s3.open(fn, "rb") as f:1163            assert f.req_kw["RequestPayer"] == "requester"1164def test_credentials():1165    s3 = S3FileSystem(key='foo', secret='foo')1166    assert s3.s3._request_signer._credentials.access_key == 'foo'1167    assert s3.s3._request_signer._credentials.secret_key == 'foo'1168    s3 = S3FileSystem(client_kwargs={'aws_access_key_id': 'bar',1169                                     'aws_secret_access_key': 'bar'})1170    assert s3.s3._request_signer._credentials.access_key == 'bar'1171    assert s3.s3._request_signer._credentials.secret_key == 'bar'1172    s3 = S3FileSystem(key='foo',1173                      client_kwargs={'aws_secret_access_key': 'bar'})1174    assert s3.s3._request_signer._credentials.access_key == 'foo'1175    assert s3.s3._request_signer._credentials.secret_key == 'bar'1176    s3 = S3FileSystem(key='foobar',1177                      secret='foobar',1178                      client_kwargs={'aws_access_key_id': 'foobar',1179                                     'aws_secret_access_key': 'foobar'})1180    assert s3.s3._request_signer._credentials.access_key == 'foobar'1181    assert s3.s3._request_signer._credentials.secret_key == 'foobar'1182    with pytest.raises(TypeError) as excinfo:1183        s3 = S3FileSystem(key='foo',1184                          secret='foo',1185                          client_kwargs={'aws_access_key_id': 'bar',1186                                         'aws_secret_access_key': 'bar'})1187        assert 'multiple values for keyword argument' in str(excinfo.value)1188def test_modified(s3):1189    dir_path = test_bucket_name+'/modified'1190    file_path = dir_path + '/file'1191    # Test file1192    s3.touch(file_path)1193    modified = s3.modified(path=file_path)1194    assert isinstance(modified, datetime.datetime)1195    # Test directory1196    with pytest.raises(IsADirectoryError):1197        modified = s3.modified(path=dir_path)1198    # Test bucket1199    with pytest.raises(IsADirectoryError):...delete_object.py
Source:delete_object.py  
1import logging2import boto33from botocore.exceptions import ClientError4import pathlib5from numpy import random6import os7import sys8def menu():9    print('******************************************************')10    print('Welcome to the Python SDEV400 Homework 1 Application.')11    print('******************************************************\n')12    print('What would you like to do today?')13    print('a. Create a S3 bucket with the name consisting of your firstname, lastname and a random 6-digit suffix.')14    print('b. Put an object (current error.log) in a previously created bucket.')15    print('c. Delete an object in a bucket. ')16    print('d. Delete a bucket.')17    print('e. Copy and object from one bucket to another.')18    print('f. Downloads an existing object from a bucket.')19    print('q. Exit the program.')20menu()21user_input = input('Enter selection: ')22if user_input == 'c':23    def bucket_choice():24        s3 = boto3.client('s3')25        response = s3.list_buckets()26        buckets = [bucket['Name'] for bucket in response['Buckets']]27        for i, item in enumerate(buckets, start=1):28            print(i, item)29        bucket_choice = int(input('Please select a bucket to list objects '))30        global x31        if bucket_choice == 1:32            x = buckets[0]33        elif bucket_choice == 2:34            x = buckets[1]35        elif bucket_choice == 3:36            x = buckets[2]37        elif bucket_choice == 4:38            x = buckets[3]39        elif bucket_choice == 5:40            x = buckets[3]41    bucket_choice()42    def list_bucket_objects(bucket_name):43        # Retrieve the list of bucket objects44        s3 = boto3.client('s3')45        try:46            response = s3.list_objects_v2(Bucket=bucket_name)47        except ClientError as e:48            # AllAccessDisabled error == bucket not found49            logging.error(e)50            return None51        return response['Contents']52    def list_bucket_main():53        """Exercise list_bucket_objects()"""54        # Assign this value before running the program55        test_bucket_name = x56        global z57        z = test_bucket_name58        # Set up logging59        logging.basicConfig(level=logging.DEBUG,60                            format='%(levelname)s: %(asctime)s: %(message)s')61        # Retrieve the bucket's objects62        objects = list_bucket_objects(test_bucket_name)63        if objects is not None:64            # List the object names65            logging.info(f'Objects in {test_bucket_name}')66            for i, item in enumerate(objects,start=1):67            #for obj, item in enumerate(objects, start=1):68                logging.info(f'  {item["Key"]}')69                print(i)70            object_choice = int(input('Please select an object to delete '))71            global object_key72            if object_choice == 1:73                object_key = objects[0]74            elif object_choice == 2:75                object_key = objects[1]76            elif object_choice == 3:77                object_key = objects[2]78            elif object_choice == 4:79                object_key = objects[3]80            elif object_choice == 5:81                object_key = objects[4]82            print(object_key['Key'])83    list_bucket_main()84    def delete_object(bucket_name, object_name):85        """Delete an object from an S3 bucket86        :param bucket_name: string87        :param object_name: string88        :return: True if the referenced object was deleted, otherwise False89        """90        # Delete the object91        s3 = boto3.client('s3')92        try:93            s3.delete_object(Bucket=bucket_name, Key=object_name)94        except ClientError as e:95            logging.error(e)96            return False97        return True98    def delete_object_main():99        """Exercise delete_object()"""100        # Assign these values before running the program101        test_bucket_name = z102        test_object_name = object_key103        #Set up logging104        logging.basicConfig(level=logging.DEBUG,105                         format='%(levelname)s: %(asctime)s: %(message)s')106        #Delete the object107        if delete_object(test_bucket_name, test_object_name):108            logging.info(f'{test_object_name} was deleted from {test_bucket_name}')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
