How to use test_bucket_name method in localstack

Best Python code snippet using localstack_python

test_s3fs.py

Source:test_s3fs.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import datetime3from contextlib import contextmanager4import errno5import json6from concurrent.futures import ProcessPoolExecutor7import io8import time9import sys10import pytest11from itertools import chain12import fsspec.core13from s3fs.core import S3FileSystem14from s3fs.utils import ignoring, SSEParams15import moto16import botocore17from unittest import mock18from botocore.exceptions import NoCredentialsError19test_bucket_name = 'test'20secure_bucket_name = 'test-secure'21versioned_bucket_name = 'test-versioned'22files = {'test/accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n'23 b'{"amount": 200, "name": "Bob"}\n'24 b'{"amount": 300, "name": "Charlie"}\n'25 b'{"amount": 400, "name": "Dennis"}\n'),26 'test/accounts.2.json': (b'{"amount": 500, "name": "Alice"}\n'27 b'{"amount": 600, "name": "Bob"}\n'28 b'{"amount": 700, "name": "Charlie"}\n'29 b'{"amount": 800, "name": "Dennis"}\n')}30csv_files = {'2014-01-01.csv': (b'name,amount,id\n'31 b'Alice,100,1\n'32 b'Bob,200,2\n'33 b'Charlie,300,3\n'),34 '2014-01-02.csv': (b'name,amount,id\n'),35 '2014-01-03.csv': (b'name,amount,id\n'36 b'Dennis,400,4\n'37 b'Edith,500,5\n'38 b'Frank,600,6\n')}39text_files = {'nested/file1': b'hello\n',40 'nested/file2': b'world',41 'nested/nested2/file1': b'hello\n',42 'nested/nested2/file2': b'world'}43glob_files = {'file.dat': b'',44 'filexdat': b''}45a = test_bucket_name + '/tmp/test/a'46b = test_bucket_name + '/tmp/test/b'47c = test_bucket_name + '/tmp/test/c'48d = test_bucket_name + '/tmp/test/d'49py35 = sys.version_info[:2] == (3, 5)50@pytest.yield_fixture51def s3():52 # writable local S3 system53 with moto.mock_s3():54 from botocore.session import Session55 session = Session()56 client = session.create_client('s3')57 client.create_bucket(Bucket=test_bucket_name, ACL='public-read')58 client.create_bucket(59 Bucket=versioned_bucket_name, ACL='public-read')60 client.put_bucket_versioning(61 Bucket=versioned_bucket_name,62 VersioningConfiguration={63 'Status': 'Enabled'64 }65 )66 # initialize secure bucket67 client.create_bucket(68 Bucket=secure_bucket_name, ACL='public-read')69 policy = json.dumps({70 "Version": "2012-10-17",71 "Id": "PutObjPolicy",72 "Statement": [73 {74 "Sid": "DenyUnEncryptedObjectUploads",75 "Effect": "Deny",76 "Principal": "*",77 "Action": "s3:PutObject",78 "Resource": "arn:aws:s3:::{bucket_name}/*".format(79 bucket_name=secure_bucket_name),80 "Condition": {81 "StringNotEquals": {82 "s3:x-amz-server-side-encryption": "aws:kms"83 }84 }85 }86 ]87 })88 client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)89 for k in [a, b, c, d]:90 try:91 client.delete_object(Bucket=test_bucket_name, Key=k)92 except:93 pass94 for flist in [files, csv_files, text_files, glob_files]:95 for f, data in flist.items():96 client.put_object(Bucket=test_bucket_name, Key=f, Body=data)97 S3FileSystem.clear_instance_cache()98 s3 = S3FileSystem(anon=False)99 s3.invalidate_cache()100 yield s3101 for flist in [files, csv_files, text_files, glob_files]:102 for f, data in flist.items():103 try:104 client.delete_object(105 Bucket=test_bucket_name, Key=f, Body=data)106 client.delete_object(107 Bucket=secure_bucket_name, Key=f, Body=data)108 except:109 pass110 for k in [a, b, c, d]:111 try:112 client.delete_object(Bucket=test_bucket_name, Key=k)113 client.delete_object(Bucket=secure_bucket_name, Key=k)114 except:115 pass116@contextmanager117def expect_errno(expected_errno):118 """Expect an OSError and validate its errno code."""119 with pytest.raises(OSError) as error:120 yield121 assert error.value.errno == expected_errno, 'OSError has wrong error code.'122def test_simple(s3):123 data = b'a' * (10 * 2 ** 20)124 with s3.open(a, 'wb') as f:125 f.write(data)126 with s3.open(a, 'rb') as f:127 out = f.read(len(data))128 assert len(data) == len(out)129 assert out == data130@pytest.mark.parametrize('default_cache_type', ['none', 'bytes', 'mmap'])131def test_default_cache_type(s3, default_cache_type):132 data = b'a' * (10 * 2 ** 20)133 s3 = S3FileSystem(anon=False, default_cache_type=default_cache_type)134 with s3.open(a, 'wb') as f:135 f.write(data)136 with s3.open(a, 'rb') as f:137 assert isinstance(f.cache, fsspec.core.caches[default_cache_type])138 out = f.read(len(data))139 assert len(data) == len(out)140 assert out == data141def test_ssl_off():142 s3 = S3FileSystem(use_ssl=False)143 assert s3.s3.meta.endpoint_url.startswith('http://')144def test_client_kwargs():145 s3 = S3FileSystem(client_kwargs={'endpoint_url': 'http://foo'})146 assert s3.s3.meta.endpoint_url.startswith('http://foo')147def test_config_kwargs():148 s3 = S3FileSystem(config_kwargs={'signature_version': 's3v4'})149 assert s3.connect(refresh=True).meta.config.signature_version == 's3v4'150def test_config_kwargs_class_attributes_default():151 s3 = S3FileSystem()152 assert s3.connect(refresh=True).meta.config.connect_timeout == 5153 assert s3.connect(refresh=True).meta.config.read_timeout == 15154def test_config_kwargs_class_attributes_override():155 s3 = S3FileSystem(156 config_kwargs={157 "connect_timeout": 60,158 "read_timeout": 120,159 }160 )161 assert s3.connect(refresh=True).meta.config.connect_timeout == 60162 assert s3.connect(refresh=True).meta.config.read_timeout == 120163def test_idempotent_connect(s3):164 con1 = s3.connect()165 con2 = s3.connect(refresh=False)166 con3 = s3.connect(refresh=True)167 assert con1 is con2168 assert con1 is not con3169def test_multiple_objects(s3):170 s3.connect()171 assert s3.ls('test')172 s32 = S3FileSystem(anon=False)173 assert s32.session174 assert s3.ls('test') == s32.ls('test')175def test_info(s3):176 s3.touch(a)177 s3.touch(b)178 info = s3.info(a)179 linfo = s3.ls(a, detail=True)[0]180 assert abs(info.pop('LastModified') - linfo.pop('LastModified')).seconds < 1181 info.pop('VersionId')182 assert info == linfo183 parent = a.rsplit('/', 1)[0]184 s3.invalidate_cache() # remove full path from the cache185 s3.ls(parent) # fill the cache with parent dir186 assert s3.info(a) == s3.dircache[parent][0] # correct value187 assert id(s3.info(a)) == id(s3.dircache[parent][0]) # is object from cache188 new_parent = test_bucket_name + '/foo'189 s3.mkdir(new_parent)190 with pytest.raises(FileNotFoundError):191 s3.info(new_parent)192 s3.ls(new_parent)193 with pytest.raises(FileNotFoundError):194 s3.info(new_parent)195def test_info_cached(s3):196 path = test_bucket_name + '/tmp/'197 fqpath = 's3://' + path198 s3.touch(path + '/test')199 info = s3.info(fqpath)200 assert info == s3.info(fqpath)201 assert info == s3.info(path)202def test_checksum(s3):203 bucket = test_bucket_name204 d = "checksum"205 prefix = d+"/e"206 o1 = prefix + "1"207 o2 = prefix + "2"208 path1 = bucket + "/" + o1209 path2 = bucket + "/" + o2210 client=s3.s3211 # init client and files212 client.put_object(Bucket=bucket, Key=o1, Body="")213 client.put_object(Bucket=bucket, Key=o2, Body="")214 # change one file, using cache215 client.put_object(Bucket=bucket, Key=o1, Body="foo")216 checksum = s3.checksum(path1)217 s3.ls(path1) # force caching218 client.put_object(Bucket=bucket, Key=o1, Body="bar")219 # refresh == False => checksum doesn't change220 assert checksum == s3.checksum(path1)221 # change one file, without cache222 client.put_object(Bucket=bucket, Key=o1, Body="foo")223 checksum = s3.checksum(path1, refresh=True)224 s3.ls(path1) # force caching225 client.put_object(Bucket=bucket, Key=o1, Body="bar")226 # refresh == True => checksum changes227 assert checksum != s3.checksum(path1, refresh=True)228 # Test for nonexistent file229 client.put_object(Bucket=bucket, Key=o1, Body="bar")230 s3.ls(path1) # force caching231 client.delete_object(Bucket=bucket, Key=o1)232 with pytest.raises(FileNotFoundError):233 checksum = s3.checksum(o1, refresh=True)234 235test_xattr_sample_metadata = {'test_xattr': '1'}236def test_xattr(s3):237 bucket, key = (test_bucket_name, 'tmp/test/xattr')238 filename = bucket + '/' + key239 body = b'aaaa'240 public_read_acl = {'Permission': 'READ', 'Grantee': {241 'URI': 'http://acs.amazonaws.com/groups/global/AllUsers', 'Type': 'Group'}}242 s3.s3.put_object(Bucket=bucket, Key=key,243 ACL='public-read',244 Metadata=test_xattr_sample_metadata,245 Body=body)246 # save etag for later247 etag = s3.info(filename)['ETag']248 assert public_read_acl in s3.s3.get_object_acl(249 Bucket=bucket, Key=key)['Grants']250 assert s3.getxattr(251 filename, 'test_xattr') == test_xattr_sample_metadata['test_xattr']252 assert s3.metadata(filename) == test_xattr_sample_metadata253 s3file = s3.open(filename)254 assert s3file.getxattr(255 'test_xattr') == test_xattr_sample_metadata['test_xattr']256 assert s3file.metadata() == test_xattr_sample_metadata257 s3file.setxattr(test_xattr='2')258 assert s3file.getxattr('test_xattr') == '2'259 s3file.setxattr(**{'test_xattr': None})260 assert s3file.metadata() == {}261 assert s3.cat(filename) == body262 # check that ACL and ETag are preserved after updating metadata263 assert public_read_acl in s3.s3.get_object_acl(264 Bucket=bucket, Key=key)['Grants']265 assert s3.info(filename)['ETag'] == etag266def test_xattr_setxattr_in_write_mode(s3):267 s3file = s3.open(a, 'wb')268 with pytest.raises(NotImplementedError):269 s3file.setxattr(test_xattr='1')270@pytest.mark.xfail()271def test_delegate(s3):272 out = s3.get_delegated_s3pars()273 assert out274 assert out['token']275 s32 = S3FileSystem(**out)276 assert not s32.anon277 assert out == s32.get_delegated_s3pars()278def test_not_delegate():279 s3 = S3FileSystem(anon=True)280 out = s3.get_delegated_s3pars()281 assert out == {'anon': True}282 s3 = S3FileSystem(anon=False) # auto credentials283 out = s3.get_delegated_s3pars()284 assert out == {'anon': False}285def test_ls(s3):286 assert set(s3.ls('')) == {test_bucket_name,287 secure_bucket_name, versioned_bucket_name}288 with pytest.raises(FileNotFoundError):289 s3.ls('nonexistent')290 fn = test_bucket_name + '/test/accounts.1.json'291 assert fn in s3.ls(test_bucket_name + '/test')292def test_pickle(s3):293 import pickle294 s32 = pickle.loads(pickle.dumps(s3))295 assert s3.ls('test') == s32.ls('test')296 s33 = pickle.loads(pickle.dumps(s32))297 assert s3.ls('test') == s33.ls('test')298def test_ls_touch(s3):299 assert not s3.exists(test_bucket_name + '/tmp/test')300 s3.touch(a)301 s3.touch(b)302 L = s3.ls(test_bucket_name + '/tmp/test', True)303 assert {d['Key'] for d in L} == {a, b}304 L = s3.ls(test_bucket_name + '/tmp/test', False)305 assert set(L) == {a, b}306@pytest.mark.parametrize('version_aware', [True, False])307def test_exists_versioned(s3, version_aware):308 """Test to ensure that a prefix exists when using a versioned bucket"""309 import uuid310 n = 3311 s3 = S3FileSystem(anon=False, version_aware=version_aware)312 segments = [versioned_bucket_name] + [str(uuid.uuid4()) for _ in range(n)]313 path = '/'.join(segments)314 for i in range(2, n+1):315 assert not s3.exists('/'.join(segments[:i]))316 s3.touch(path)317 for i in range(2, n+1):318 assert s3.exists('/'.join(segments[:i]))319def test_isfile(s3):320 assert not s3.isfile('')321 assert not s3.isfile('/')322 assert not s3.isfile(test_bucket_name)323 assert not s3.isfile(test_bucket_name + '/test')324 assert not s3.isfile(test_bucket_name + '/test/foo')325 assert s3.isfile(test_bucket_name + '/test/accounts.1.json')326 assert s3.isfile(test_bucket_name + '/test/accounts.2.json')327 assert not s3.isfile(a)328 s3.touch(a)329 assert s3.isfile(a)330 assert not s3.isfile(b)331 assert not s3.isfile(b + '/')332 s3.mkdir(b)333 assert not s3.isfile(b)334 assert not s3.isfile(b + '/')335 assert not s3.isfile(c)336 assert not s3.isfile(c + '/')337 s3.mkdir(c + '/')338 assert not s3.isfile(c)339 assert not s3.isfile(c + '/')340def test_isdir(s3):341 assert s3.isdir('')342 assert s3.isdir('/')343 assert s3.isdir(test_bucket_name)344 assert s3.isdir(test_bucket_name + '/test')345 assert not s3.isdir(test_bucket_name + '/test/foo')346 assert not s3.isdir(test_bucket_name + '/test/accounts.1.json')347 assert not s3.isdir(test_bucket_name + '/test/accounts.2.json')348 assert not s3.isdir(a)349 s3.touch(a)350 assert not s3.isdir(a)351 assert not s3.isdir(b)352 assert not s3.isdir(b + '/')353 assert not s3.isdir(c)354 assert not s3.isdir(c + '/')355 # test cache356 s3.invalidate_cache()357 assert not s3.dircache358 s3.ls(test_bucket_name + '/nested')359 assert test_bucket_name + '/nested' in s3.dircache360 assert not s3.isdir(test_bucket_name + '/nested/file1')361 assert not s3.isdir(test_bucket_name + '/nested/file2')362 assert s3.isdir(test_bucket_name + '/nested/nested2')363 assert s3.isdir(test_bucket_name + '/nested/nested2/')364def test_rm(s3):365 assert not s3.exists(a)366 s3.touch(a)367 assert s3.exists(a)368 s3.rm(a)369 assert not s3.exists(a)370 with pytest.raises(FileNotFoundError):371 s3.rm(test_bucket_name + '/nonexistent')372 with pytest.raises(FileNotFoundError):373 s3.rm('nonexistent')374 s3.rm(test_bucket_name + '/nested', recursive=True)375 assert not s3.exists(test_bucket_name + '/nested/nested2/file1')376 # whole bucket377 s3.rm(test_bucket_name, recursive=True)378 assert not s3.exists(test_bucket_name + '/2014-01-01.csv')379 assert not s3.exists(test_bucket_name)380def test_rmdir(s3):381 bucket = 'test1_bucket'382 s3.mkdir(bucket)383 s3.rmdir(bucket)384 assert bucket not in s3.ls('/')385def test_mkdir(s3):386 bucket = 'test1_bucket'387 s3.mkdir(bucket)388 assert bucket in s3.ls('/')389def test_mkdir_region_name(s3):390 bucket = 'test1_bucket'391 s3.mkdir(bucket, region_name="eu-central-1")392 assert bucket in s3.ls('/')393def test_mkdir_client_region_name():394 bucket = 'test1_bucket'395 try:396 m = moto.mock_s3()397 m.start()398 s3 = S3FileSystem(anon=False, client_kwargs={"region_name":399 "eu-central-1"})400 s3.mkdir(bucket)401 assert bucket in s3.ls('/')402 finally:403 m.stop()404def test_bulk_delete(s3):405 with pytest.raises(FileNotFoundError):406 s3.bulk_delete(['nonexistent/file'])407 with pytest.raises(ValueError):408 s3.bulk_delete(['bucket1/file', 'bucket2/file'])409 filelist = s3.find(test_bucket_name+'/nested')410 s3.bulk_delete(filelist)411 assert not s3.exists(test_bucket_name + '/nested/nested2/file1')412def test_anonymous_access():413 with ignoring(NoCredentialsError):414 s3 = S3FileSystem(anon=True)415 assert s3.ls('') == []416 # TODO: public bucket doesn't work through moto417 with pytest.raises(PermissionError):418 s3.mkdir('newbucket')419def test_s3_file_access(s3):420 fn = test_bucket_name + '/nested/file1'421 data = b'hello\n'422 assert s3.cat(fn) == data423 assert s3.head(fn, 3) == data[:3]424 assert s3.tail(fn, 3) == data[-3:]425 assert s3.tail(fn, 10000) == data426def test_s3_file_info(s3):427 fn = test_bucket_name + '/nested/file1'428 data = b'hello\n'429 assert fn in s3.find(test_bucket_name)430 assert s3.exists(fn)431 assert not s3.exists(fn + 'another')432 assert s3.info(fn)['Size'] == len(data)433 with pytest.raises(FileNotFoundError):434 s3.info(fn + 'another')435def test_bucket_exists(s3):436 assert s3.exists(test_bucket_name)437 assert not s3.exists(test_bucket_name + 'x')438 s3 = S3FileSystem(anon=True)439 assert s3.exists(test_bucket_name)440 assert not s3.exists(test_bucket_name + 'x')441def test_du(s3):442 d = s3.du(test_bucket_name, total=False)443 assert all(isinstance(v, int) and v >= 0 for v in d.values())444 assert test_bucket_name + '/nested/file1' in d445 assert s3.du(test_bucket_name + '/test/', total=True) == \446 sum(map(len, files.values()))447 assert s3.du(test_bucket_name) == s3.du('s3://' + test_bucket_name)448def test_s3_ls(s3):449 fn = test_bucket_name + '/nested/file1'450 assert fn not in s3.ls(test_bucket_name + '/')451 assert fn in s3.ls(test_bucket_name + '/nested/')452 assert fn in s3.ls(test_bucket_name + '/nested')453 assert s3.ls('s3://' + test_bucket_name +454 '/nested/') == s3.ls(test_bucket_name + '/nested')455def test_s3_big_ls(s3):456 for x in range(1200):457 s3.touch(test_bucket_name + '/thousand/%i.part' % x)458 assert len(s3.find(test_bucket_name)) > 1200459 s3.rm(test_bucket_name + '/thousand/', recursive=True)460 assert len(s3.find(test_bucket_name + '/thousand/')) == 0461def test_s3_ls_detail(s3):462 L = s3.ls(test_bucket_name + '/nested', detail=True)463 assert all(isinstance(item, dict) for item in L)464def test_s3_glob(s3):465 fn = test_bucket_name + '/nested/file1'466 assert fn not in s3.glob(test_bucket_name + '/')467 assert fn not in s3.glob(test_bucket_name + '/*')468 assert fn not in s3.glob(test_bucket_name + '/nested')469 assert fn in s3.glob(test_bucket_name + '/nested/*')470 assert fn in s3.glob(test_bucket_name + '/nested/file*')471 assert fn in s3.glob(test_bucket_name + '/*/*')472 assert all(any(p.startswith(f + '/') or p == f473 for p in s3.find(test_bucket_name))474 for f in s3.glob(test_bucket_name + '/nested/*'))475 assert [test_bucket_name +476 '/nested/nested2'] == s3.glob(test_bucket_name + '/nested/nested2')477 out = s3.glob(test_bucket_name + '/nested/nested2/*')478 assert {'test/nested/nested2/file1',479 'test/nested/nested2/file2'} == set(out)480 with pytest.raises(ValueError):481 s3.glob('*')482 # Make sure glob() deals with the dot character (.) correctly.483 assert test_bucket_name + '/file.dat' in s3.glob(test_bucket_name + '/file.*')484 assert test_bucket_name + \485 '/filexdat' not in s3.glob(test_bucket_name + '/file.*')486def test_get_list_of_summary_objects(s3):487 L = s3.ls(test_bucket_name + '/test')488 assert len(L) == 2489 assert [l.lstrip(test_bucket_name).lstrip('/')490 for l in sorted(L)] == sorted(list(files))491 L2 = s3.ls('s3://' + test_bucket_name + '/test')492 assert L == L2493def test_read_keys_from_bucket(s3):494 for k, data in files.items():495 file_contents = s3.cat('/'.join([test_bucket_name, k]))496 assert file_contents == data497 assert (s3.cat('/'.join([test_bucket_name, k])) ==498 s3.cat('s3://' + '/'.join([test_bucket_name, k])))499@pytest.mark.xfail(reason="misbehaves in modern versions of moto?")500def test_url(s3):501 fn = test_bucket_name + '/nested/file1'502 url = s3.url(fn, expires=100)503 assert 'http' in url504 import urllib.parse505 components = urllib.parse.urlparse(url)506 query = urllib.parse.parse_qs(components.query)507 exp = int(query['Expires'][0])508 delta = abs(exp - time.time() - 100)509 assert delta < 5510 with s3.open(fn) as f:511 assert 'http' in f.url()512def test_seek(s3):513 with s3.open(a, 'wb') as f:514 f.write(b'123')515 with s3.open(a) as f:516 f.seek(1000)517 with pytest.raises(ValueError):518 f.seek(-1)519 with pytest.raises(ValueError):520 f.seek(-5, 2)521 with pytest.raises(ValueError):522 f.seek(0, 10)523 f.seek(0)524 assert f.read(1) == b'1'525 f.seek(0)526 assert f.read(1) == b'1'527 f.seek(3)528 assert f.read(1) == b''529 f.seek(-1, 2)530 assert f.read(1) == b'3'531 f.seek(-1, 1)532 f.seek(-1, 1)533 assert f.read(1) == b'2'534 for i in range(4):535 assert f.seek(i) == i536def test_bad_open(s3):537 with pytest.raises(ValueError):538 s3.open('')539def test_copy(s3):540 fn = test_bucket_name + '/test/accounts.1.json'541 s3.copy(fn, fn + '2')542 assert s3.cat(fn) == s3.cat(fn + '2')543def test_copy_managed(s3):544 data = b'abc' * 12*2**20545 fn = test_bucket_name + '/test/biggerfile'546 with s3.open(fn, 'wb') as f:547 f.write(data)548 s3.copy_managed(fn, fn + '2', block=5 * 2 ** 20)549 assert s3.cat(fn) == s3.cat(fn + '2')550 with pytest.raises(ValueError):551 s3.copy_managed(fn, fn + '3', block=4 * 2 ** 20)552 with pytest.raises(ValueError):553 s3.copy_managed(fn, fn + '3', block=6 * 2 ** 30)554def test_move(s3):555 fn = test_bucket_name + '/test/accounts.1.json'556 data = s3.cat(fn)557 s3.mv(fn, fn + '2')558 assert s3.cat(fn + '2') == data559 assert not s3.exists(fn)560def test_get_put(s3, tmpdir):561 test_file = str(tmpdir.join('test.json'))562 s3.get(test_bucket_name + '/test/accounts.1.json', test_file)563 data = files['test/accounts.1.json']564 assert open(test_file, 'rb').read() == data565 s3.put(test_file, test_bucket_name + '/temp')566 assert s3.du(test_bucket_name +567 '/temp', total=False)[test_bucket_name + '/temp'] == len(data)568 assert s3.cat(test_bucket_name + '/temp') == data569def test_errors(s3):570 with pytest.raises(FileNotFoundError):571 s3.open(test_bucket_name + '/tmp/test/shfoshf', 'rb')572 # This is fine, no need for interleaving directories on S3573 # with pytest.raises((IOError, OSError)):574 # s3.touch('tmp/test/shfoshf/x')575 with pytest.raises(FileNotFoundError):576 s3.rm(test_bucket_name + '/tmp/test/shfoshf/x')577 with pytest.raises(FileNotFoundError):578 s3.mv(test_bucket_name + '/tmp/test/shfoshf/x', 'tmp/test/shfoshf/y')579 with pytest.raises(ValueError):580 s3.open('x', 'rb')581 with pytest.raises(FileNotFoundError):582 s3.rm('unknown')583 with pytest.raises(ValueError):584 with s3.open(test_bucket_name + '/temp', 'wb') as f:585 f.read()586 with pytest.raises(ValueError):587 f = s3.open(test_bucket_name + '/temp', 'rb')588 f.close()589 f.read()590 with pytest.raises(ValueError):591 s3.mkdir('/')592 with pytest.raises(ValueError):593 s3.find('')594 with pytest.raises(ValueError):595 s3.find('s3://')596def test_read_small(s3):597 fn = test_bucket_name + '/2014-01-01.csv'598 with s3.open(fn, 'rb', block_size=10) as f:599 out = []600 while True:601 data = f.read(3)602 if data == b'':603 break604 out.append(data)605 assert s3.cat(fn) == b''.join(out)606 # cache drop607 assert len(f.cache) < len(out)608def test_read_s3_block(s3):609 data = files['test/accounts.1.json']610 lines = io.BytesIO(data).readlines()611 path = test_bucket_name + '/test/accounts.1.json'612 assert s3.read_block(path, 1, 35, b'\n') == lines[1]613 assert s3.read_block(path, 0, 30, b'\n') == lines[0]614 assert s3.read_block(path, 0, 35, b'\n') == lines[0] + lines[1]615 assert s3.read_block(path, 0, 5000, b'\n') == data616 assert len(s3.read_block(path, 0, 5)) == 5617 assert len(s3.read_block(path, 4, 5000)) == len(data) - 4618 assert s3.read_block(path, 5000, 5010) == b''619 assert s3.read_block(path, 5, None) == s3.read_block(path, 5, 1000)620def test_new_bucket(s3):621 assert not s3.exists('new')622 s3.mkdir('new')623 assert s3.exists('new')624 with s3.open('new/temp', 'wb') as f:625 f.write(b'hello')626 with expect_errno(errno.ENOTEMPTY):627 s3.rmdir('new')628 s3.rm('new/temp')629 s3.rmdir('new')630 assert 'new' not in s3.ls('')631 assert not s3.exists('new')632 with pytest.raises(FileNotFoundError):633 s3.ls('new')634def test_dynamic_add_rm(s3):635 s3.mkdir('one')636 s3.mkdir('one/two')637 assert s3.exists('one')638 s3.ls('one')639 s3.touch("one/two/file_a")640 assert s3.exists('one/two/file_a')641 s3.rm('one', recursive=True)642 assert not s3.exists('one')643def test_write_small(s3):644 with s3.open(test_bucket_name + '/test', 'wb') as f:645 f.write(b'hello')646 assert s3.cat(test_bucket_name + '/test') == b'hello'647 s3.open(test_bucket_name + '/test', 'wb').close()648 assert s3.info(test_bucket_name + '/test')['Size'] == 0649def test_write_large(s3):650 "flush() chunks buffer when processing large singular payload"651 mb = 2 ** 20652 payload_size = int(2.5 * 5 * mb)653 payload = b'0' * payload_size654 with s3.open(test_bucket_name + '/test', 'wb') as fd, \655 mock.patch.object(s3, '_call_s3', side_effect=s3._call_s3) as s3_mock:656 fd.write(payload)657 upload_parts = s3_mock.mock_calls[1:]658 upload_sizes = [len(upload_part[2]['Body']) for upload_part in upload_parts]659 assert upload_sizes == [5 * mb, int(7.5 * mb)]660 assert s3.cat(test_bucket_name + '/test') == payload661 assert s3.info(test_bucket_name + '/test')['Size'] == payload_size662def test_write_limit(s3):663 "flush() respects part_max when processing large singular payload"664 mb = 2 ** 20665 block_size = 15 * mb666 part_max = 28 * mb667 payload_size = 44 * mb668 payload = b'0' * payload_size669 with s3.open(test_bucket_name + '/test', 'wb') as fd, \670 mock.patch('s3fs.core.S3File.part_max', new=part_max), \671 mock.patch.object(s3, '_call_s3', side_effect=s3._call_s3) as s3_mock:672 fd.blocksize = block_size673 fd.write(payload)674 upload_parts = s3_mock.mock_calls[1:]675 upload_sizes = [len(upload_part[2]['Body']) for upload_part in upload_parts]676 assert upload_sizes == [block_size, int(14.5 * mb), int(14.5 * mb)]677 assert s3.cat(test_bucket_name + '/test') == payload678 assert s3.info(test_bucket_name + '/test')['Size'] == payload_size679def test_write_small_secure(s3):680 # Unfortunately moto does not yet support enforcing SSE policies. It also681 # does not return the correct objects that can be used to test the results682 # effectively.683 # This test is left as a placeholder in case moto eventually supports this.684 sse_params = SSEParams(server_side_encryption='aws:kms')685 with s3.open(secure_bucket_name + '/test', 'wb', writer_kwargs=sse_params) as f:686 f.write(b'hello')687 assert s3.cat(secure_bucket_name + '/test') == b'hello'688 head = s3.s3.head_object(Bucket=secure_bucket_name, Key='test')689def test_write_large_secure(s3):690 s3_mock = moto.mock_s3()691 s3_mock.start()692 # build our own s3fs with the relevant additional kwarg693 s3 = S3FileSystem(s3_additional_kwargs={'ServerSideEncryption': 'AES256'})694 s3.mkdir('mybucket')695 with s3.open('mybucket/myfile', 'wb') as f:696 f.write(b'hello hello' * 10 ** 6)697 assert s3.cat('mybucket/myfile') == b'hello hello' * 10 ** 6698def test_write_fails(s3):699 with pytest.raises(ValueError):700 s3.touch(test_bucket_name + '/temp')701 s3.open(test_bucket_name + '/temp', 'rb').write(b'hello')702 with pytest.raises(ValueError):703 s3.open(test_bucket_name + '/temp', 'wb', block_size=10)704 f = s3.open(test_bucket_name + '/temp', 'wb')705 f.close()706 with pytest.raises(ValueError):707 f.write(b'hello')708 with pytest.raises(FileNotFoundError):709 s3.open('nonexistentbucket/temp', 'wb').close()710def test_write_blocks(s3):711 with s3.open(test_bucket_name + '/temp', 'wb') as f:712 f.write(b'a' * 2 * 2 ** 20)713 assert f.buffer.tell() == 2 * 2 ** 20714 assert not (f.parts)715 f.flush()716 assert f.buffer.tell() == 2 * 2 ** 20717 assert not (f.parts)718 f.write(b'a' * 2 * 2 ** 20)719 f.write(b'a' * 2 * 2 ** 20)720 assert f.mpu721 assert f.parts722 assert s3.info(test_bucket_name + '/temp')['Size'] == 6 * 2 ** 20723 with s3.open(test_bucket_name + '/temp', 'wb', block_size=10 * 2 ** 20) as f:724 f.write(b'a' * 15 * 2 ** 20)725 assert f.buffer.tell() == 0726 assert s3.info(test_bucket_name + '/temp')['Size'] == 15 * 2 ** 20727def test_readline(s3):728 all_items = chain.from_iterable([729 files.items(), csv_files.items(), text_files.items()730 ])731 for k, data in all_items:732 with s3.open('/'.join([test_bucket_name, k]), 'rb') as f:733 result = f.readline()734 expected = data.split(b'\n')[0] + (b'\n' if data.count(b'\n')735 else b'')736 assert result == expected737def test_readline_empty(s3):738 data = b''739 with s3.open(a, 'wb') as f:740 f.write(data)741 with s3.open(a, 'rb') as f:742 result = f.readline()743 assert result == data744def test_readline_blocksize(s3):745 data = b'ab\n' + b'a' * (10 * 2 ** 20) + b'\nab'746 with s3.open(a, 'wb') as f:747 f.write(data)748 with s3.open(a, 'rb') as f:749 result = f.readline()750 expected = b'ab\n'751 assert result == expected752 result = f.readline()753 expected = b'a' * (10 * 2 ** 20) + b'\n'754 assert result == expected755 result = f.readline()756 expected = b'ab'757 assert result == expected758def test_next(s3):759 expected = csv_files['2014-01-01.csv'].split(b'\n')[0] + b'\n'760 with s3.open(test_bucket_name + '/2014-01-01.csv') as f:761 result = next(f)762 assert result == expected763def test_iterable(s3):764 data = b'abc\n123'765 with s3.open(a, 'wb') as f:766 f.write(data)767 with s3.open(a) as f, io.BytesIO(data) as g:768 for froms3, fromio in zip(f, g):769 assert froms3 == fromio770 f.seek(0)771 assert f.readline() == b'abc\n'772 assert f.readline() == b'123'773 f.seek(1)774 assert f.readline() == b'bc\n'775 with s3.open(a) as f:776 out = list(f)777 with s3.open(a) as f:778 out2 = f.readlines()779 assert out == out2780 assert b"".join(out) == data781def test_readable(s3):782 with s3.open(a, 'wb') as f:783 assert not f.readable()784 with s3.open(a, 'rb') as f:785 assert f.readable()786def test_seekable(s3):787 with s3.open(a, 'wb') as f:788 assert not f.seekable()789 with s3.open(a, 'rb') as f:790 assert f.seekable()791def test_writable(s3):792 with s3.open(a, 'wb') as f:793 assert f.writable()794 with s3.open(a, 'rb') as f:795 assert not f.writable()796def test_merge(s3):797 with s3.open(a, 'wb') as f:798 f.write(b'a' * 10 * 2 ** 20)799 with s3.open(b, 'wb') as f:800 f.write(b'a' * 10 * 2 ** 20)801 s3.merge(test_bucket_name + '/joined', [a, b])802 assert s3.info(test_bucket_name + '/joined')['Size'] == 2 * 10 * 2 ** 20803def test_append(s3):804 data = text_files['nested/file1']805 with s3.open(test_bucket_name + '/nested/file1', 'ab') as f:806 assert f.tell() == len(data) # append, no write, small file807 assert s3.cat(test_bucket_name + '/nested/file1') == data808 with s3.open(test_bucket_name + '/nested/file1', 'ab') as f:809 f.write(b'extra') # append, write, small file810 assert s3.cat(test_bucket_name + '/nested/file1') == data + b'extra'811 with s3.open(a, 'wb') as f:812 f.write(b'a' * 10 * 2 ** 20)813 with s3.open(a, 'ab') as f:814 pass # append, no write, big file815 assert s3.cat(a) == b'a' * 10 * 2 ** 20816 with s3.open(a, 'ab') as f:817 assert f.parts is None818 f._initiate_upload()819 assert f.parts820 assert f.tell() == 10 * 2 ** 20821 f.write(b'extra') # append, small write, big file822 assert s3.cat(a) == b'a' * 10 * 2 ** 20 + b'extra'823 with s3.open(a, 'ab') as f:824 assert f.tell() == 10 * 2 ** 20 + 5825 f.write(b'b' * 10 * 2 ** 20) # append, big write, big file826 assert f.tell() == 20 * 2 ** 20 + 5827 assert s3.cat(a) == b'a' * 10 * 2 ** 20 + b'extra' + b'b' * 10 * 2 ** 20828def test_bigger_than_block_read(s3):829 with s3.open(test_bucket_name + '/2014-01-01.csv', 'rb', block_size=3) as f:830 out = []831 while True:832 data = f.read(20)833 out.append(data)834 if len(data) == 0:835 break836 assert b''.join(out) == csv_files['2014-01-01.csv']837def test_current(s3):838 s3._cache.clear()839 s3 = S3FileSystem()840 assert s3.current() is s3841 assert S3FileSystem.current() is s3842def test_array(s3):843 from array import array844 data = array('B', [65] * 1000)845 with s3.open(a, 'wb') as f:846 f.write(data)847 with s3.open(a, 'rb') as f:848 out = f.read()849 assert out == b'A' * 1000850def _get_s3_id(s3):851 return id(s3.s3)852def test_no_connection_sharing_among_processes(s3):853 executor = ProcessPoolExecutor()854 conn_id = executor.submit(_get_s3_id, s3).result()855 assert id(s3.connect()) != conn_id, \856 "Processes should not share S3 connections."857@pytest.mark.xfail()858def test_public_file(s3):859 # works on real s3, not on moto860 try:861 test_bucket_name = 's3fs_public_test'862 other_bucket_name = 's3fs_private_test'863 s3.touch(test_bucket_name)864 s3.touch(test_bucket_name + '/afile')865 s3.touch(other_bucket_name, acl='public-read')866 s3.touch(other_bucket_name + '/afile', acl='public-read')867 s = S3FileSystem(anon=True)868 with pytest.raises(PermissionError):869 s.ls(test_bucket_name)870 s.ls(other_bucket_name)871 s3.chmod(test_bucket_name, acl='public-read')872 s3.chmod(other_bucket_name, acl='private')873 with pytest.raises(PermissionError):874 s.ls(other_bucket_name, refresh=True)875 assert s.ls(test_bucket_name, refresh=True)876 # public file in private bucket877 with s3.open(other_bucket_name + '/see_me', 'wb', acl='public-read') as f:878 f.write(b'hello')879 assert s.cat(other_bucket_name + '/see_me') == b'hello'880 finally:881 s3.rm(test_bucket_name, recursive=True)882 s3.rm(other_bucket_name, recursive=True)883def test_upload_with_s3fs_prefix(s3):884 path = 's3://test/prefix/key'885 with s3.open(path, 'wb') as f:886 f.write(b'a' * (10 * 2 ** 20))887 with s3.open(path, 'ab') as f:888 f.write(b'b' * (10 * 2 ** 20))889def test_multipart_upload_blocksize(s3):890 blocksize = 5 * (2 ** 20)891 expected_parts = 3892 s3f = s3.open(a, 'wb', block_size=blocksize)893 for _ in range(3):894 data = b'b' * blocksize895 s3f.write(data)896 # Ensure that the multipart upload consists of only 3 parts897 assert len(s3f.parts) == expected_parts898 s3f.close()899def test_default_pars(s3):900 s3 = S3FileSystem(default_block_size=20, default_fill_cache=False)901 fn = test_bucket_name + '/' + list(files)[0]902 with s3.open(fn) as f:903 assert f.blocksize == 20904 assert f.fill_cache is False905 with s3.open(fn, block_size=40, fill_cache=True) as f:906 assert f.blocksize == 40907 assert f.fill_cache is True908def test_tags(s3):909 tagset = {'tag1': 'value1', 'tag2': 'value2'}910 fname = list(files)[0]911 s3.touch(fname)912 s3.put_tags(fname, tagset)913 assert s3.get_tags(fname) == tagset914 # Ensure merge mode updates value of existing key and adds new one915 new_tagset = {'tag2': 'updatedvalue2', 'tag3': 'value3'}916 s3.put_tags(fname, new_tagset, mode='m')917 tagset.update(new_tagset)918 assert s3.get_tags(fname) == tagset919@pytest.mark.skipif(py35, reason='no versions on old moto for py36')920def test_versions(s3):921 versioned_file = versioned_bucket_name + '/versioned_file'922 s3 = S3FileSystem(anon=False, version_aware=True)923 with s3.open(versioned_file, 'wb') as fo:924 fo.write(b'1')925 with s3.open(versioned_file, 'wb') as fo:926 fo.write(b'2')927 assert s3.isfile(versioned_file)928 versions = s3.object_version_info(versioned_file)929 version_ids = [version['VersionId'] for version in versions]930 assert len(version_ids) == 2931 with s3.open(versioned_file) as fo:932 assert fo.version_id == version_ids[1]933 assert fo.read() == b'2'934 with s3.open(versioned_file, version_id=version_ids[0]) as fo:935 assert fo.version_id == version_ids[0]936 assert fo.read() == b'1'937@pytest.mark.skipif(py35, reason='no versions on old moto for py36')938def test_list_versions_many(s3):939 # moto doesn't actually behave in the same way that s3 does here so this doesn't test940 # anything really in moto 1.2941 s3 = S3FileSystem(anon=False, version_aware=True)942 versioned_file = versioned_bucket_name + '/versioned_file2'943 for i in range(1200):944 with s3.open(versioned_file, 'wb') as fo:945 fo.write(b'1')946 versions = s3.object_version_info(versioned_file)947 assert len(versions) == 1200948def test_fsspec_versions_multiple(s3):949 """Test that the standard fsspec.core.get_fs_token_paths behaves as expected for versionId urls"""950 s3 = S3FileSystem(anon=False, version_aware=True)951 versioned_file = versioned_bucket_name + '/versioned_file3'952 version_lookup = {}953 for i in range(20):954 contents = str(i).encode()955 with s3.open(versioned_file, 'wb') as fo:956 fo.write(contents)957 version_lookup[fo.version_id] = contents958 urls = ["s3://{}?versionId={}".format(versioned_file, version)959 for version in version_lookup.keys()]960 fs, token, paths = fsspec.core.get_fs_token_paths(urls)961 assert isinstance(fs, S3FileSystem)962 assert fs.version_aware963 for path in paths:964 with fs.open(path, 'rb') as fo:965 contents = fo.read()966 assert contents == version_lookup[fo.version_id]967@pytest.mark.skipif(py35, reason='no versions on old moto for py36')968def test_versioned_file_fullpath(s3):969 versioned_file = versioned_bucket_name + '/versioned_file_fullpath'970 s3 = S3FileSystem(anon=False, version_aware=True)971 with s3.open(versioned_file, 'wb') as fo:972 fo.write(b'1')973 # moto doesn't correctly return a versionId for a multipart upload. So we resort to this.974 # version_id = fo.version_id975 versions = s3.object_version_info(versioned_file)976 version_ids = [version['VersionId'] for version in versions]977 version_id = version_ids[0]978 with s3.open(versioned_file, 'wb') as fo:979 fo.write(b'2')980 file_with_version = "{}?versionId={}".format(versioned_file, version_id)981 with s3.open(file_with_version, 'rb') as fo:982 assert fo.version_id == version_id983 assert fo.read() == b'1'984def test_versions_unaware(s3):985 versioned_file = versioned_bucket_name + '/versioned_file3'986 s3 = S3FileSystem(anon=False, version_aware=False)987 with s3.open(versioned_file, 'wb') as fo:988 fo.write(b'1')989 with s3.open(versioned_file, 'wb') as fo:990 fo.write(b'2')991 with s3.open(versioned_file) as fo:992 assert fo.version_id is None993 assert fo.read() == b'2'994 with pytest.raises(ValueError):995 with s3.open(versioned_file, version_id='0'):996 fo.read()997def test_text_io__stream_wrapper_works(s3):998 """Ensure using TextIOWrapper works."""999 s3.mkdir('bucket')1000 with s3.open('bucket/file.txt', 'wb') as fd:1001 fd.write(u'\u00af\\_(\u30c4)_/\u00af'.encode('utf-16-le'))1002 with s3.open('bucket/file.txt', 'rb') as fd:1003 with io.TextIOWrapper(fd, 'utf-16-le') as stream:1004 assert stream.readline() == u'\u00af\\_(\u30c4)_/\u00af'1005def test_text_io__basic(s3):1006 """Text mode is now allowed."""1007 s3.mkdir('bucket')1008 with s3.open('bucket/file.txt', 'w') as fd:1009 fd.write(u'\u00af\\_(\u30c4)_/\u00af')1010 with s3.open('bucket/file.txt', 'r') as fd:1011 assert fd.read() == u'\u00af\\_(\u30c4)_/\u00af'1012def test_text_io__override_encoding(s3):1013 """Allow overriding the default text encoding."""1014 s3.mkdir('bucket')1015 with s3.open('bucket/file.txt', 'w', encoding='ibm500') as fd:1016 fd.write(u'Hello, World!')1017 with s3.open('bucket/file.txt', 'r', encoding='ibm500') as fd:1018 assert fd.read() == u'Hello, World!'1019def test_readinto(s3):1020 s3.mkdir('bucket')1021 with s3.open('bucket/file.txt', 'wb') as fd:1022 fd.write(b'Hello, World!')1023 contents = bytearray(15)1024 with s3.open('bucket/file.txt', 'rb') as fd:1025 assert fd.readinto(contents) == 131026 assert contents.startswith(b'Hello, World!')1027def test_change_defaults_only_subsequent():1028 """Test for Issue #1351029 Ensure that changing the default block size doesn't affect existing file1030 systems that were created using that default. It should only affect file1031 systems created after the change.1032 """1033 try:1034 S3FileSystem.cachable = False # don't reuse instances with same pars1035 fs_default = S3FileSystem()1036 assert fs_default.default_block_size == 5 * (1024 ** 2)1037 fs_overridden = S3FileSystem(default_block_size=64 * (1024 ** 2))1038 assert fs_overridden.default_block_size == 64 * (1024 ** 2)1039 # Suppose I want all subsequent file systems to have a block size of 1 GiB1040 # instead of 5 MiB:1041 S3FileSystem.default_block_size = 1024 ** 31042 fs_big = S3FileSystem()1043 assert fs_big.default_block_size == 1024 ** 31044 # Test the other file systems created to see if their block sizes changed1045 assert fs_overridden.default_block_size == 64 * (1024 ** 2)1046 assert fs_default.default_block_size == 5 * (1024 ** 2)1047 finally:1048 S3FileSystem.default_block_size = 5 * (1024 ** 2)1049 S3FileSystem.cachable = True1050def test_passed_in_session_set_correctly(s3):1051 session = botocore.session.Session()1052 s3 = S3FileSystem(session=session)1053 assert s3.passed_in_session is session1054 client = s3.connect()1055 assert s3.session is session1056def test_without_passed_in_session_set_unique(s3):1057 session = botocore.session.Session()1058 s3 = S3FileSystem()1059 assert s3.passed_in_session is None1060 client = s3.connect()1061 assert s3.session is not session1062def test_pickle_without_passed_in_session(s3):1063 import pickle1064 s3 = S3FileSystem()1065 pickle.dumps(s3)1066def test_pickle_with_passed_in_session(s3):1067 import pickle1068 session = botocore.session.Session()1069 s3 = S3FileSystem(session=session)1070 with pytest.raises((AttributeError, NotImplementedError, TypeError, pickle.PicklingError)):1071 pickle.dumps(s3)1072def test_cache_after_copy(s3):1073 # https://github.com/dask/dask/issues/51341074 s3.touch('test/afile')1075 assert 'test/afile' in s3.ls('s3://test', False)1076 s3.cp('test/afile', 'test/bfile')1077 assert 'test/bfile' in s3.ls('s3://test', False)1078def test_autocommit(s3):1079 auto_file = test_bucket_name + '/auto_file'1080 committed_file = test_bucket_name + '/commit_file'1081 aborted_file = test_bucket_name + '/aborted_file'1082 s3 = S3FileSystem(anon=False, version_aware=True)1083 def write_and_flush(path, autocommit):1084 with s3.open(path, 'wb', autocommit=autocommit) as fo:1085 fo.write(b'1')1086 return fo1087 # regular behavior1088 fo = write_and_flush(auto_file, autocommit=True)1089 assert fo.autocommit1090 assert s3.exists(auto_file)1091 fo = write_and_flush(committed_file, autocommit=False)1092 assert not fo.autocommit1093 assert not s3.exists(committed_file)1094 fo.commit()1095 assert s3.exists(committed_file)1096 fo = write_and_flush(aborted_file,autocommit=False)1097 assert not s3.exists(aborted_file)1098 fo.discard()1099 assert not s3.exists(aborted_file)1100 # Cannot commit a file that was discarded1101 with pytest.raises(Exception):1102 fo.commit()1103def test_autocommit_mpu(s3):1104 """When not autocommitting we always want to use multipart uploads"""1105 path = test_bucket_name + '/auto_commit_with_mpu'1106 with s3.open(path, 'wb', autocommit=False) as fo:1107 fo.write(b'1')1108 assert fo.mpu is not None1109 assert len(fo.parts) == 11110def test_touch(s3):1111 # create1112 fn = test_bucket_name + "/touched"1113 assert not s3.exists(fn)1114 s3.touch(fn)1115 assert s3.exists(fn)1116 assert s3.size(fn) == 01117 # truncates1118 with s3.open(fn, 'wb') as f:1119 f.write(b'data')1120 assert s3.size(fn) == 41121 s3.touch(fn, truncate=True)1122 assert s3.size(fn) == 01123 # exists error1124 with s3.open(fn, 'wb') as f:1125 f.write(b'data')1126 assert s3.size(fn) == 41127 with pytest.raises(ValueError):1128 s3.touch(fn, truncate=False)1129 assert s3.size(fn) == 41130def test_seek_reads(s3):1131 fn = test_bucket_name + "/myfile"1132 with s3.open(fn, 'wb') as f:1133 f.write(b'a' * 175627146)1134 with s3.open(fn, 'rb', blocksize=100) as f:1135 f.seek(175561610)1136 d1 = f.read(65536)1137 f.seek(4)1138 size = 175621981139 d2 = f.read(size)1140 assert len(d2) == size1141 f.seek(17562288)1142 size = 175621871143 d3 = f.read(size)1144 assert len(d3) == size1145def test_connect_many():1146 from multiprocessing.pool import ThreadPool1147 def task(i):1148 S3FileSystem(anon=False).ls("")1149 return True1150 pool = ThreadPool(processes=20)1151 out = pool.map(task, range(40))1152 assert all(out)1153 pool.close()1154 pool.join()1155def test_requester_pays():1156 fn = test_bucket_name + "/myfile"1157 with moto.mock_s3():1158 s3 = S3FileSystem(requester_pays=True)1159 assert s3.req_kw["RequestPayer"] == "requester"1160 s3.mkdir(test_bucket_name)1161 s3.touch(fn)1162 with s3.open(fn, "rb") as f:1163 assert f.req_kw["RequestPayer"] == "requester"1164def test_credentials():1165 s3 = S3FileSystem(key='foo', secret='foo')1166 assert s3.s3._request_signer._credentials.access_key == 'foo'1167 assert s3.s3._request_signer._credentials.secret_key == 'foo'1168 s3 = S3FileSystem(client_kwargs={'aws_access_key_id': 'bar',1169 'aws_secret_access_key': 'bar'})1170 assert s3.s3._request_signer._credentials.access_key == 'bar'1171 assert s3.s3._request_signer._credentials.secret_key == 'bar'1172 s3 = S3FileSystem(key='foo',1173 client_kwargs={'aws_secret_access_key': 'bar'})1174 assert s3.s3._request_signer._credentials.access_key == 'foo'1175 assert s3.s3._request_signer._credentials.secret_key == 'bar'1176 s3 = S3FileSystem(key='foobar',1177 secret='foobar',1178 client_kwargs={'aws_access_key_id': 'foobar',1179 'aws_secret_access_key': 'foobar'})1180 assert s3.s3._request_signer._credentials.access_key == 'foobar'1181 assert s3.s3._request_signer._credentials.secret_key == 'foobar'1182 with pytest.raises(TypeError) as excinfo:1183 s3 = S3FileSystem(key='foo',1184 secret='foo',1185 client_kwargs={'aws_access_key_id': 'bar',1186 'aws_secret_access_key': 'bar'})1187 assert 'multiple values for keyword argument' in str(excinfo.value)1188def test_modified(s3):1189 dir_path = test_bucket_name+'/modified'1190 file_path = dir_path + '/file'1191 # Test file1192 s3.touch(file_path)1193 modified = s3.modified(path=file_path)1194 assert isinstance(modified, datetime.datetime)1195 # Test directory1196 with pytest.raises(IsADirectoryError):1197 modified = s3.modified(path=dir_path)1198 # Test bucket1199 with pytest.raises(IsADirectoryError):...

Full Screen

Full Screen

delete_object.py

Source:delete_object.py Github

copy

Full Screen

1import logging2import boto33from botocore.exceptions import ClientError4import pathlib5from numpy import random6import os7import sys8def menu():9 print('******************************************************')10 print('Welcome to the Python SDEV400 Homework 1 Application.')11 print('******************************************************\n')12 print('What would you like to do today?')13 print('a. Create a S3 bucket with the name consisting of your firstname, lastname and a random 6-digit suffix.')14 print('b. Put an object (current error.log) in a previously created bucket.')15 print('c. Delete an object in a bucket. ')16 print('d. Delete a bucket.')17 print('e. Copy and object from one bucket to another.')18 print('f. Downloads an existing object from a bucket.')19 print('q. Exit the program.')20menu()21user_input = input('Enter selection: ')22if user_input == 'c':23 def bucket_choice():24 s3 = boto3.client('s3')25 response = s3.list_buckets()26 buckets = [bucket['Name'] for bucket in response['Buckets']]27 for i, item in enumerate(buckets, start=1):28 print(i, item)29 bucket_choice = int(input('Please select a bucket to list objects '))30 global x31 if bucket_choice == 1:32 x = buckets[0]33 elif bucket_choice == 2:34 x = buckets[1]35 elif bucket_choice == 3:36 x = buckets[2]37 elif bucket_choice == 4:38 x = buckets[3]39 elif bucket_choice == 5:40 x = buckets[3]41 bucket_choice()42 def list_bucket_objects(bucket_name):43 # Retrieve the list of bucket objects44 s3 = boto3.client('s3')45 try:46 response = s3.list_objects_v2(Bucket=bucket_name)47 except ClientError as e:48 # AllAccessDisabled error == bucket not found49 logging.error(e)50 return None51 return response['Contents']52 def list_bucket_main():53 """Exercise list_bucket_objects()"""54 # Assign this value before running the program55 test_bucket_name = x56 global z57 z = test_bucket_name58 # Set up logging59 logging.basicConfig(level=logging.DEBUG,60 format='%(levelname)s: %(asctime)s: %(message)s')61 # Retrieve the bucket's objects62 objects = list_bucket_objects(test_bucket_name)63 if objects is not None:64 # List the object names65 logging.info(f'Objects in {test_bucket_name}')66 for i, item in enumerate(objects,start=1):67 #for obj, item in enumerate(objects, start=1):68 logging.info(f' {item["Key"]}')69 print(i)70 object_choice = int(input('Please select an object to delete '))71 global object_key72 if object_choice == 1:73 object_key = objects[0]74 elif object_choice == 2:75 object_key = objects[1]76 elif object_choice == 3:77 object_key = objects[2]78 elif object_choice == 4:79 object_key = objects[3]80 elif object_choice == 5:81 object_key = objects[4]82 print(object_key['Key'])83 list_bucket_main()84 def delete_object(bucket_name, object_name):85 """Delete an object from an S3 bucket86 :param bucket_name: string87 :param object_name: string88 :return: True if the referenced object was deleted, otherwise False89 """90 # Delete the object91 s3 = boto3.client('s3')92 try:93 s3.delete_object(Bucket=bucket_name, Key=object_name)94 except ClientError as e:95 logging.error(e)96 return False97 return True98 def delete_object_main():99 """Exercise delete_object()"""100 # Assign these values before running the program101 test_bucket_name = z102 test_object_name = object_key103 #Set up logging104 logging.basicConfig(level=logging.DEBUG,105 format='%(levelname)s: %(asctime)s: %(message)s')106 #Delete the object107 if delete_object(test_bucket_name, test_object_name):108 logging.info(f'{test_object_name} was deleted from {test_bucket_name}')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful