How to use assert_bucket_exists method in localstack

Best Python code snippet using localstack_python

utils_test.py

Source:utils_test.py Github

copy

Full Screen

...298 'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',299 return_value='test-bucket-not-found')300 def test_assert_bucket_exists_not_found(self, mock_response, mock_client):301 with self.assertRaises(ValueError):302 utils.assert_bucket_exists('')303 @patch(304 'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',305 return_value='test-bucket-not-verified')306 def test_assert_bucket_exists_not_verified(self, mock_response, mock_client):307 from apache_beam.runners.interactive.utils import _LOGGER308 with self.assertLogs(_LOGGER, level='WARNING'):309 utils.assert_bucket_exists('')310 @patch(311 'apache_beam.io.gcp.internal.clients.storage.StorageBucketsGetRequest',312 return_value='test-bucket-found')313 def test_assert_bucket_exists_found(self, mock_response, mock_client):314 utils.assert_bucket_exists('')315class PipelineUtilTest(unittest.TestCase):316 def test_detect_pipeline_underlying_runner(self):317 p = beam.Pipeline(InteractiveRunner(underlying_runner=FlinkRunner()))318 pipeline_runner = utils.detect_pipeline_runner(p)319 self.assertTrue(isinstance(pipeline_runner, FlinkRunner))320 def test_detect_pipeline_no_underlying_runner(self):321 p = beam.Pipeline(InteractiveRunner())322 pipeline_runner = utils.detect_pipeline_runner(p)323 from apache_beam.runners.direct.direct_runner import DirectRunner324 self.assertTrue(isinstance(pipeline_runner, DirectRunner))325 def test_detect_pipeline_no_runner(self):326 pipeline_runner = utils.detect_pipeline_runner(None)327 self.assertEqual(pipeline_runner, None)328if __name__ == '__main__':...

Full Screen

Full Screen

fshandler.py

Source:fshandler.py Github

copy

Full Screen

...103 fullpath = os.path.join(bucket, key)104 return fullpath105 def bucket_is_local(self, bucket):106 try:107 FilesystemHandler.assert_bucket_exists(self, bucket)108 except S3_Error:109 return False110 return True111 def get_bucket_host(self, bucket):112 self.assert_bucket_exists(bucket)113 return "%s:%d" % (socket.gethostname(), s3path.s3path().port)114 def assert_bucket_exists(self, bucket):115 path = self.bucket_to_path(bucket)116 if (path == None) or (not os.path.exists(path)):117 raise S3_Error("NoSuchBucket", bucket)118 return False119 return True120 def assert_bucket_key_exists(self, bucket, key):121 if (key == None or key == ''):122 raise S3_Error("NoSuchKey", bucket + '/' + key)123 return False124 path = self.bucket_key_to_path(bucket, key)125 if (path == None) or (not os.path.exists(path)):126 raise S3_Error("NoSuchKey", bucket + '/' + key)127 return False128 return True129 def pp(self, o):130 """quick and easy debug print """131 import pprint132 pp = pprint.PrettyPrinter(indent=4)133 pp.pprint(o)134 def is_internal_file(self, f):135 """ returns true if a file is an internal bookeeping filename """136 for ext in self.internal_file_extensions:137 if f.endswith(ext):138 return True139 return False140 def get_entries(self, dir, files_only=False):141 """ directory lister for buckets and keys """142 root = self.directory143 if dir:144 root = dir145 entries = []146 try:147 all_entries = os.listdir(root)148 except OSError:149 raise S3_ListEntriesError150 for e in all_entries:151 if self.is_internal_file(e):152 continue153 full_path = os.path.join(root, e)154 if not os.access(full_path, os.R_OK):155 continue156 if files_only and (not os.path.isfile(full_path)):157 continue158 entries.append(self.filename_to_id(e))159 return entries160 def get_bucket_list(self, accesskey):161 """return a list of buckets at the endpoint"""162 return self.get_entries(None)163 def get_key_list(self, bucket):164 """list the keys in a bucket"""165 try:166 return self.get_entries(self.bucket_to_path(bucket), True)167 except S3_ListEntriesError:168 raise S3_Error("NoSuchBucket")169 def get_bucket_owner(self, bucket):170 """ returns the bucket owner """171 path = os.path.join(self.id_to_filename(bucket), '_owner')172 if not os.isfile(path):173 return "nobody"174 f = open("r", path)175 owner = f.read()176 f.close()177 owner = owner.rstrip("\n")178 return owner179 def assert_can_write_to_bucket(self, accesskey, bucket):180 """ returns true, or raises an access exception if not allowed """181 owner = self.get_bucket_owner(bucket)182 if owner == accesskey:183 return True184 raise S3_Error("AccessDenied", "Only the bucket owner can write to the bucket.")185 def set_bucket_owner(self, bucket, owner):186 """ returns the bucket owner """187 path = os.path.join(self.id_to_filename(bucket), '_owner')188 f = open("w", path)189 owner = f.write(owner)190 f.close()191 192 193 def is_roundtripable_header(self, h):194 header_patterns = (195 '^content-type',196 '^content-disposition',197 '^etag',198 '^x-amz',199 '^x-',200 )201 for p in header_patterns:202 if re.search(p, h, re.IGNORECASE):203 return True204 return False205 def get_head(self, bucket, key):206 headers = {}207 if key:208 self.assert_bucket_key_exists(bucket, key)209 key_file = self.bucket_key_to_path(bucket, key)210 hf = key_file + "_meta.txt"211 if os.path.exists(hf):212 f = open(hf, "r")213 hp = email.parser.HeaderParser()214 headerobj = hp.parse(f)215 for h, v in headerobj.items():216 if self.is_roundtripable_header(h):217 headers[h] = v218 else:219 if not mimetypes.inited:220 mimetypes.init()221 (type, encoding) = mimetypes.guess_type(key_file)222 if type:223 headers['content-type'] = type224 headers['content-length'] = "%d" % self.get_size(bucket, key)225 headers['ETag'] = self.get_md5(bucket, key)226 return headers227 228 def get_data(self, bucket, key):229 """ return the content of an object """230 self.assert_bucket_key_exists(bucket, key)231 path=self.bucket_key_to_path(bucket, key)232 s=os.stat(path)233 size=s[6]234 fp=open(path,"r")235 f_iter = s3server.file_iter(fp, size)236 self._log('Serving content of %s' % path)237 return f_iter238 def get_mtime(self, bucket, key):239 """ return the last modified date of the object """240 self.assert_bucket_key_exists(bucket, key)241 path=self.bucket_key_to_path(bucket, key)242 s=os.stat(path)243 date=s[8]244 return date245 def get_size(self, bucket, key):246 """ return the size of a key """247 self.assert_bucket_key_exists(bucket, key)248 path=self.bucket_key_to_path(bucket, key)249 s=os.stat(path)250 size=s[6]251 return size252 def get_md5(self, bucket, key):253 """ return the md5 of a key """254 self.assert_bucket_key_exists(bucket, key)255 path=self.bucket_key_to_path(bucket, key)256 m = hashlib.md5()257 f=open(path, "r")258 while True:259 b=f.read(4096)260 if not b:261 break262 m.update(b)263 f.close()264 return m.hexdigest()265 266 def put_head(self, bucket, key, header):267 """ utility function to safe the header metadata """268 self.assert_bucket_exists(bucket)269 key = self.bucket_key_to_path(bucket, key)270 if key:271 f = open(key + "_meta.txt", "w")272 head = self.head_data(header)273 f.write(head)274 f.close()275 def get_owner_accesskey(self, bucket, key):276 path = self.bucket_to_path(bucket)277 f = os.path.join(path, '_owner')278 if not os.path.isfile(f):279 return None280 o = open(f, 'r')281 owner = o.read()282 o.close()283 return owner.rstrip("\n")284 285 def head_data(self, header):286 """ return a string with the headders for storage287 we strip out cookie headers because they can be used288 to steal sessions """289 head = ''290 for k in sorted(header.keys()):291 v = header[k]292 if re.search("^cookie", k, re.IGNORECASE):293 continue294 if re.search("^authorization", k, re.IGNORECASE):295 (access, sep, secret) = v.partition(':')296 v = access + ':REDACTED_BY_IA_S3'297 head = head + k + ': ' + v + "\n"298 if 'date' not in header and 'x-amz-date' not in header :299 head = head + 'x-upload-date: ' + time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()) + "\n"300 return head301 302 def put_key(self,bucket, key,filehandle,header,accesskey,queue_derive,keep_old_version):303 """ Store a key on disk """304 self.assert_bucket_exists(bucket)305 path=self.bucket_key_to_path(bucket, key)306 # put temp files in the bucket307 tempdir=self.bucket_to_path(bucket)308 try:309 (descriptor, temp) = tempfile.mkstemp('_temp', '', tempdir)310 fp=os.fdopen(descriptor, 'w+')311 buffer = ''312 chunk = 32768313 copied = 0314 size = atoi(self.headers['content-length'])315 md5_hash = hashlib.md5()316 while (copied < size and buffer != None):317 toread = min(chunk, (size - copied))318 buffer = filehandle.read(toread)...

Full Screen

Full Screen

s3gw-versioning-smoke-tests.py

Source:s3gw-versioning-smoke-tests.py Github

copy

Full Screen

...50 # size passed is in mb51 size = size * 1024 * 102452 with open(path, 'wb') as fout:53 fout.write(os.urandom(size))54 def assert_bucket_exists(self, bucket_name):55 response = self.s3_client.list_buckets()56 found = False57 for bucket in response['Buckets']:58 if (bucket["Name"] == bucket_name):59 found = True60 self.assertTrue(found)61 def test_create_bucket_enable_versioning(self):62 bucket_name = self.get_random_bucket_name()63 self.s3_client.create_bucket(Bucket=bucket_name)64 self.assert_bucket_exists(bucket_name)65 # ensure versioning is disabled (default)66 response = self.s3_client.get_bucket_versioning(Bucket=bucket_name)67 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)68 self.assertFalse('Status' in response)69 response = self.s3_client.put_bucket_versioning(Bucket=bucket_name,70 VersioningConfiguration={71 'MFADelete': 'Disabled',72 'Status': 'Enabled'})73 response = self.s3_client.get_bucket_versioning(Bucket=bucket_name)74 self.assertTrue('Status' in response)75 self.assertEqual('Enabled', response['Status'])76 def test_put_objects_versioning_enabled(self):77 bucket_name = self.get_random_bucket_name()78 self.s3_client.create_bucket(Bucket=bucket_name)79 self.assert_bucket_exists(bucket_name)80 response = self.s3_client.put_bucket_versioning(Bucket=bucket_name,81 VersioningConfiguration={82 'MFADelete': 'Disabled',83 'Status': 'Enabled'})84 object_name = self.get_random_object_name()85 test_file_path_1 = os.path.join(self.test_dir.name, 'test_file_1.bin')86 self.generate_random_file(test_file_path_1)87 # upload the file88 self.s3_client.upload_file(test_file_path_1, bucket_name, object_name)89 # get the file and compare with the original90 test_file_path_1_check = os.path.join(self.test_dir.name, 'test_file_1_check.bin')91 self.s3_client.download_file(bucket_name, object_name, test_file_path_1_check)92 self.assertTrue(filecmp.cmp(test_file_path_1, test_file_path_1_check, shallow=False))93 # now upload again with different content94 test_file_path_2 = os.path.join(self.test_dir.name, 'test_file_2.bin')95 self.generate_random_file(test_file_path_2)96 self.s3_client.upload_file(test_file_path_2, bucket_name, object_name)97 test_file_path_2_check = os.path.join(self.test_dir.name, 'test_file_2_check.bin')98 self.s3_client.download_file(bucket_name, object_name, test_file_path_2_check)99 self.assertTrue(filecmp.cmp(test_file_path_2, test_file_path_2_check, shallow=False))100 # get etag of object101 response = self.s3_client.head_object(Bucket=bucket_name, Key=object_name)102 self.assertTrue('ETag' in response)103 etag = response['ETag']104 # check that we have 2 versions105 # only 1 version should be flagged as the latest106 response = self.s3_client.list_object_versions(Bucket=bucket_name, Prefix=object_name)107 self.assertTrue('Versions' in response)108 self.assertEqual(2, len(response['Versions']))109 num_latest = 0110 last_version_id = ''111 previous_version_id = ''112 for version in response['Versions']:113 self.assertEqual(os.path.getsize(test_file_path_1), version['Size'])114 self.assertEqual(object_name, version['Key'])115 self.assertEqual('STANDARD', version['StorageClass'])116 self.assertEqual({'DisplayName': 'M. Tester', 'ID': 'testid'}, version['Owner'])117 self.assertEqual(etag, version['ETag'])118 self.assertNotEqual('null', version['VersionId'])119 if (version['IsLatest']):120 num_latest += 1121 last_version_id = version['VersionId']122 else:123 previous_version_id = version['VersionId']124 self.assertEqual(1, num_latest)125 self.assertNotEqual('', last_version_id)126 self.assertNotEqual('', previous_version_id)127 # download by version_id128 # download the last version129 check_version_file = os.path.join(self.test_dir.name, 'check_version.bin')130 bucket = self.s3.Bucket(bucket_name)131 bucket.download_file(132 object_name,133 check_version_file,134 ExtraArgs={"VersionId": last_version_id})135 self.assertTrue(filecmp.cmp(test_file_path_2, check_version_file, shallow=False))136 # download the previous version137 check_version_file_2 = os.path.join(self.test_dir.name, 'check_version2.bin')138 bucket.download_file(139 object_name,140 check_version_file_2,141 ExtraArgs={"VersionId": previous_version_id})142 self.assertTrue(filecmp.cmp(test_file_path_1, check_version_file_2, shallow=False))143 # delete the object144 self.s3_client.delete_object(Bucket=bucket_name, Key=object_name)145 # check that we have 3 versions146 # only 1 version should be flagged as the latest147 response = self.s3_client.list_object_versions(Bucket=bucket_name, Prefix=object_name)148 self.assertTrue('Versions' in response)149 self.assertEqual(3, len(response['Versions']))150 num_latest = 0151 deleted_version_id = ''152 for version in response['Versions']:153 self.assertEqual(os.path.getsize(test_file_path_1), version['Size'])154 self.assertEqual(object_name, version['Key'])155 self.assertEqual('STANDARD', version['StorageClass'])156 self.assertEqual({'DisplayName': 'M. Tester', 'ID': 'testid'}, version['Owner'])157 self.assertEqual(etag, version['ETag'])158 self.assertNotEqual('null', version['VersionId'])159 if (version['IsLatest']):160 num_latest += 1161 deleted_version_id = version['VersionId']162 self.assertEqual(1, num_latest)163 self.assertNotEqual('', deleted_version_id)164 # try to download the file, a 404 error should be returned165 check_deleted_file = os.path.join(self.test_dir.name, 'check_deleted.bin')166 with self.assertRaises(botocore.exceptions.ClientError) as context:167 response = self.s3_client.download_file(bucket_name, object_name, check_deleted_file)168 self.assertTrue('404' in str(context.exception))169 # download the previous version, it should still be reacheable170 check_version_file_2 = os.path.join(self.test_dir.name, 'check_version2.bin')171 bucket.download_file(172 object_name,173 check_version_file_2,174 ExtraArgs={"VersionId": last_version_id})175 self.assertTrue(filecmp.cmp(test_file_path_2, check_version_file_2, shallow=False))176 def test_put_objects_no_versioning(self):177 bucket_name = self.get_random_bucket_name()178 self.s3_client.create_bucket(Bucket=bucket_name)179 self.assert_bucket_exists(bucket_name)180 object_name = self.get_random_object_name()181 test_file_path_1 = os.path.join(self.test_dir.name, 'test_file_1.bin')182 self.generate_random_file(test_file_path_1)183 # upload the file184 self.s3_client.upload_file(test_file_path_1, bucket_name, object_name)185 # get the file and compare with the original186 test_file_path_1_check = os.path.join(self.test_dir.name, 'test_file_1_check.bin')187 self.s3_client.download_file(bucket_name, object_name, test_file_path_1_check)188 self.assertTrue(filecmp.cmp(test_file_path_1, test_file_path_1_check, shallow=False))189 # now upload again with different content190 test_file_path_2 = os.path.join(self.test_dir.name, 'test_file_2.bin')191 self.generate_random_file(test_file_path_2)192 self.s3_client.upload_file(test_file_path_2, bucket_name, object_name)193 test_file_path_2_check = os.path.join(self.test_dir.name, 'test_file_2_check.bin')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful