How to use is_bucket_available method in localstack

Best Python code snippet using localstack_python

test_requirements_handler.py

Source:test_requirements_handler.py Github

copy

Full Screen

...300 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),301 os.path.join(self.dpkg_dir, "status"))302 self.handler.cache.open()303 self.assertFalse(self.handler.is_bucket_uptodate(["testpackage"]))304 def test_is_bucket_available(self):305 """An available bucket on that platform is reported"""306 self.assertTrue(self.handler.is_bucket_available(['testpackage', 'testpackage1']))307 def test_is_bucket_available_multi_arch_current_arch(self):308 """We return a package is available on the current platform"""309 self.assertTrue(self.handler.is_bucket_available(['testpackage:{}'.format(tools.get_current_arch())]))310 def test_unavailable_bucket(self):311 """An unavailable bucket on that platform is reported"""312 self.assertFalse(self.handler.is_bucket_available(['testpackage42', 'testpackage404']))313 def test_is_bucket_available_foreign_archs(self):314 """After adding a foreign arch, test that the package is available on it"""315 subprocess.call([self.dpkg, "--add-architecture", "foo"])316 self.handler.cache.open() # reopen the cache with the new added architecture317 self.assertTrue(self.handler.is_bucket_available(['testpackagefoo:foo', 'testpackage1']))318 def test_is_bucket_unavailable_with_foreign_archs(self):319 """After adding a foreign arch, test that the package is unavailable and report so"""320 subprocess.call([self.dpkg, "--add-architecture", "foo"])321 self.handler.cache.open() # reopen the cache with the new added architecture322 self.assertFalse(self.handler.is_bucket_available(['testpackagebar:foo', 'testpackage1']))323 def test_bucket_unavailable_but_foreign_archs_no_added(self):324 """Bucket is set as available when foreign arch not added"""325 self.assertTrue(self.handler.is_bucket_available(['testpackagefoo:foo', 'testpackage1']))326 def test_bucket_unavailable_foreign_archs_no_added_another_package_not_available(self):327 """Bucket is set as unavailable when foreign arch not added, but another package on current arch is328 unavailable"""329 self.assertFalse(self.handler.is_bucket_available(['testpackagefoo:foo', 'testpackage123']))330 def test_apt_cache_not_ready(self):331 """When the first apt.Cache() access tells it's not ready, we wait and recover"""332 origin_open = self.handler.cache.open333 raise_returned = False334 def cache_call(*args, **kwargs):335 nonlocal raise_returned336 if raise_returned:337 return origin_open()338 else:339 raise_returned = True340 raise SystemError341 with patch.object(self.handler.cache, 'open', side_effect=cache_call) as openaptcache_mock:342 self.handler.install_bucket(["testpackage"], lambda x: "", self.done_callback)343 self.wait_for_callback(self.done_callback)344 self.assertEqual(openaptcache_mock.call_count, 2)345 def test_upgrade(self):346 """Upgrade one package already installed"""347 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),348 os.path.join(self.dpkg_dir, "status"))349 self.handler.cache.open()350 self.assertTrue(self.handler.is_bucket_installed(["testpackage"]))351 self.assertEqual(self.handler.cache["testpackage"].installed.version, "0.0.0")352 self.handler.install_bucket(["testpackage"], lambda x: "", self.done_callback)353 self.wait_for_callback(self.done_callback)354 self.assertEqual(self.done_callback.call_args[0][0].bucket, ['testpackage'])355 self.assertIsNone(self.done_callback.call_args[0][0].error)356 self.assertTrue(self.handler.is_bucket_installed(["testpackage"]))357 self.assertEqual(self.handler.cache["testpackage"].installed.version, "0.0.1")358 def test_one_install_one_upgrade(self):359 """Install and Upgrade one package in the same bucket"""360 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),361 os.path.join(self.dpkg_dir, "status"))362 self.handler.cache.open()363 self.assertTrue(self.handler.is_bucket_installed(["testpackage"]))364 self.assertEqual(self.handler.cache["testpackage"].installed.version, "0.0.0")365 self.assertFalse(self.handler.is_bucket_installed(["testpackage0"]))366 self.handler.install_bucket(["testpackage", "testpackage0"], lambda x: "", self.done_callback)367 self.wait_for_callback(self.done_callback)368 self.assertEqual(self.done_callback.call_args[0][0].bucket, ['testpackage', 'testpackage0'])369 self.assertIsNone(self.done_callback.call_args[0][0].error)370 self.assertTrue(self.handler.is_bucket_installed(["testpackage", "testpackage0"]))371 self.assertEqual(self.handler.cache["testpackage"].installed.version, "0.0.1")372 def test_install_with_foreign_foreign_arch_added(self):373 """Install packages with a foreign arch added"""374 subprocess.call([self.dpkg, "--add-architecture", "foo"])375 self.handler.cache.open() # reopen the cache with the new added architecture376 bucket = ["testpackagefoo:foo", "testpackage1"]377 with patch("umake.tools.subprocess") as subprocess_mock:378 subprocess_mock.check_output.side_effect = subprocess.check_output379 self.handler.install_bucket(bucket, lambda x: "", self.done_callback)380 self.wait_for_callback(self.done_callback)381 self.assertFalse(subprocess_mock.call.called)382 self.assertEqual(self.done_callback.call_args[0][0].bucket, bucket)383 self.assertIsNone(self.done_callback.call_args[0][0].error)384 self.assertTrue(self.handler.is_bucket_installed(bucket))385 def test_install_with_foreign_foreign_arch_not_added(self):386 """Install packages with a foreign arch, while the foreign arch wasn't added"""387 with suppress(KeyError):388 if os.environ["CI"]:389 bucket = ["testpackagefoo:foo", "testpackage1"]390 self.handler.install_bucket(bucket, lambda x: "", self.done_callback)391 self.wait_for_callback(self.done_callback)392 self.assertEqual(self.done_callback.call_args[0][0].bucket, bucket)393 self.assertIsNone(self.done_callback.call_args[0][0].error)394 self.assertTrue(self.handler.is_bucket_installed(bucket))395 def test_install_with_foreign_foreign_arch_add_fails(self):396 """Install packages with a foreign arch, where adding a foreign arch fails"""397 bucket = ["testpackagefoo:foo", "testpackage1"]398 with patch("umake.tools.subprocess") as subprocess_mock:399 subprocess_mock.call.return_value = 1400 self.handler.install_bucket(bucket, lambda x: "", self.done_callback)401 self.wait_for_callback(self.done_callback)402 self.assertTrue(subprocess_mock.call.called)403 self.assertFalse(self.handler.is_bucket_installed(bucket))404 self.expect_warn_error = True405 def test_cant_change_seteuid(self):406 """Not being able to change the euid to root returns an error"""407 os.seteuid.side_effect = PermissionError()408 self.handler.install_bucket(["testpackage"], lambda x: "", self.done_callback)409 self.wait_for_callback(self.done_callback)410 self.assertEqual(self.done_callback.call_args[0][0].bucket, ['testpackage'])411 self.assertIsNotNone(self.done_callback.call_args[0][0].error)412 self.assertFalse(self.handler.is_bucket_installed(["testpackage"]))413 self.expect_warn_error = True414 def test_or_option_none_installed(self):415 self.assertFalse(self.handler.is_bucket_installed(["testpackage | testpackage0"]))416 def test_or_option_first_installed(self):417 test_bucket = ["testpackage | testpackage1 | testpackage0"]418 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),419 os.path.join(self.dpkg_dir, "status"))420 self.handler.cache.open()421 self.assertTrue(self.handler.is_bucket_installed(test_bucket))422 self.assertEqual(test_bucket, ['testpackage'])423 def test_or_option_second_installed(self):424 test_bucket = ["testpackage0 | testpackage | testpackage1", "testpackage2"]425 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),426 os.path.join(self.dpkg_dir, "status"))427 self.handler.cache.open()428 self.assertTrue(self.handler.is_bucket_installed(test_bucket))429 self.assertEqual(test_bucket, ['testpackage2', 'testpackage'])430 def test_or_option_third_installed(self):431 test_bucket = ["testpackage0 | testpackage1 | testpackage", "testpackage2"]432 shutil.copy(os.path.join(self.apt_status_dir, "testpackage_installed_dpkg_status"),433 os.path.join(self.dpkg_dir, "status"))434 self.handler.cache.open()435 self.assertTrue(self.handler.is_bucket_installed(test_bucket))436 self.assertEqual(test_bucket, ['testpackage2', 'testpackage'])437 def test_or_option_is_first_available(self):438 test_bucket = ["testpackage | testpackage42", "testpackage1"]439 self.handler.cache.open()440 self.assertTrue(self.handler.is_bucket_available(test_bucket))441 self.assertEqual(test_bucket, ['testpackage1', 'testpackage'])442 def test_or_option_is_second_available(self):443 test_bucket = ["testpackage42 | testpackage"]444 self.handler.cache.open()445 self.assertTrue(self.handler.is_bucket_available(test_bucket))446 self.assertEqual(test_bucket, ['testpackage'])447 def test_or_option_is_none_available(self):448 self.assertFalse(self.handler.is_bucket_available(['testpackage42 | testpackage404']))449 def test_or_option_both_available(self):450 test_bucket = ['testpackage | testpackage0', 'testpackage1']451 self.handler.cache.open()452 self.assertTrue(self.handler.is_bucket_available(test_bucket))...

Full Screen

Full Screen

S3Bucket.py

Source:S3Bucket.py Github

copy

Full Screen

...10 self.aws_secret_access_key = aws_secret_access_key11 self.s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id,12 aws_secret_access_key=aws_secret_access_key, region_name=region,13 endpoint_url=endpoint_url)14 if not self.is_bucket_available():15 logging.info("bucket {} does not exist, creating..".format(self.bucket_name))16 self.create_bucket()17 def is_bucket_available(self):18 all_buckets = self.s3_client.list_buckets()19 for bucket in all_buckets['Buckets']:20 if self.bucket_name not in bucket['Name']:21 return False22 else:23 return True24 def create_bucket(self):25 """Create an S3 bucket in a specified region26 If a region is not specified, the bucket is created in the S3 default27 region (us-east-1).28 :param bucket_name: Bucket to create29 :param region: String region to create bucket in, e.g., 'us-west-2'30 :return: True if bucket created, else False31 """...

Full Screen

Full Screen

regulate_ec2.py

Source:regulate_ec2.py Github

copy

Full Screen

...14 return ec215def get_s3_connection():16 s3 = boto3.client("s3")17 return s318def is_bucket_available(bucket_name):19 s3 = get_s3_connection()20 list_of_buckets = s3.list_buckets()21 bucket_list_names = [bucket["Name"] for bucket in list_of_buckets["Buckets"]]22 if bucket_name not in bucket_list_names:23 return False24 else:25 return True26def upload_instance_file_to_s3(file_name):27 s3 = get_s3_connection()28 bucket_name = "abhishek-coda-bucket-ewrsd"29 if is_bucket_available(bucket_name):30 s3.upload_file(file_name, bucket_name, file_name)31 else:32 s3.create_bucket(Bucket=bucket_name)33 s3.upload_file(file_name, bucket_name, file_name)34def write_instances_to_csv(instance_id, name):35 file_name = "instances_list.csv"36 with open("file_name", "w+") as instance_file:37 file_writer = csv.writer(instance_file)38 file_writer.writerow([instance_id, name])39 upload_instance_file_to_s3(file_name)40def get_instances():41 ec2 = get_ec2_session()42 ec2_ = boto3.resource("ec2")43 inst_list = ec2_.instances.all()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful