How to use install_lambda_java_testlibs method in localstack

Best Python code snippet using localstack_python

install.py

Source:install.py Github

copy

Full Screen

...459 # install LocalStack "fat" JAR file (contains all dependencies)460 if not os.path.exists(INSTALL_PATH_LOCALSTACK_FAT_JAR):461 log_install_msg("LocalStack Java libraries", verbatim=True)462 download(URL_LOCALSTACK_FAT_JAR, INSTALL_PATH_LOCALSTACK_FAT_JAR)463def install_lambda_java_testlibs():464 # Download the LocalStack Utils Test jar file from the maven repo465 if not os.path.exists(TEST_LAMBDA_JAVA):466 mkdir(os.path.dirname(TEST_LAMBDA_JAVA))467 download(TEST_LAMBDA_JAR_URL, TEST_LAMBDA_JAVA)468def install_go_lambda_runtime():469 if os.path.isfile(GO_LAMBDA_RUNTIME):470 return471 log_install_msg("Installing golang runtime")472 system = platform.system().lower()473 arch = get_arch()474 if system not in ["linux"]:475 raise ValueError("unsupported os %s for awslambda-go-runtime" % system)476 if arch not in ["amd64", "arm64"]:477 raise ValueError("unsupported arch %s for awslambda-go-runtime" % arch)478 url = GO_RUNTIME_DOWNLOAD_URL_TEMPLATE.format(479 version=GO_RUNTIME_VERSION,480 os=system,481 arch=arch,482 )483 download_and_extract(url, GO_INSTALL_FOLDER)484 st = os.stat(GO_LAMBDA_RUNTIME)485 os.chmod(GO_LAMBDA_RUNTIME, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)486 st = os.stat(GO_LAMBDA_MOCKSERVER)487 os.chmod(GO_LAMBDA_MOCKSERVER, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)488def install_lambda_runtime():489 if os.path.isfile(LAMBDA_RUNTIME_INIT_PATH):490 return491 log_install_msg("Installing lambda runtime")492 arch = get_arch()493 arch = "x86_64" if arch == "amd64" else arch494 download_url = LAMBDA_RUNTIME_INIT_URL.format(arch=arch)495 download(download_url, LAMBDA_RUNTIME_INIT_PATH)496 st = os.stat(LAMBDA_RUNTIME_INIT_PATH)497 os.chmod(LAMBDA_RUNTIME_INIT_PATH, mode=st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)498def install_cloudformation_libs():499 from localstack.services.cloudformation import deployment_utils500 # trigger download of CF module file501 deployment_utils.get_cfn_response_mod_file()502def install_terraform() -> str:503 if os.path.isfile(TERRAFORM_BIN):504 return TERRAFORM_BIN505 log_install_msg(f"Installing terraform {TERRAFORM_VERSION}")506 system = platform.system().lower()507 arch = get_arch()508 url = TERRAFORM_URL_TEMPLATE.format(version=TERRAFORM_VERSION, os=system, arch=arch)509 download_and_extract(url, os.path.dirname(TERRAFORM_BIN))510 chmod_r(TERRAFORM_BIN, 0o777)511 return TERRAFORM_BIN512def get_terraform_binary() -> str:513 if not os.path.isfile(TERRAFORM_BIN):514 install_terraform()515 return TERRAFORM_BIN516def install_component(name):517 installer = installers.get(name)518 if installer:519 installer()520def install_components(names):521 parallelize(install_component, names)522 install_lambda_java_libs()523def install_all_components():524 # install dependencies - make sure that install_components(..) is called before hooks.install below!525 install_components(DEFAULT_SERVICE_PORTS.keys())526 hooks.install.run()527def install_debugpy_and_dependencies():528 try:529 import debugpy530 assert debugpy531 logging.debug("Debugpy module already Installed")532 except ModuleNotFoundError:533 logging.debug("Installing Debugpy module")534 import pip535 if hasattr(pip, "main"):536 pip.main(["install", DEBUGPY_MODULE])537 else:538 pip._internal.main(["install", DEBUGPY_MODULE])539# -----------------540# HELPER FUNCTIONS541# -----------------542def log_install_msg(component, verbatim=False):543 component = component if verbatim else "local %s server" % component544 LOG.info("Downloading and installing %s. This may take some time.", component)545def download_and_extract(archive_url, target_dir, retries=0, sleep=3, tmp_archive=None):546 mkdir(target_dir)547 _, ext = os.path.splitext(tmp_archive or archive_url)548 tmp_archive = tmp_archive or new_tmp_file()549 if not os.path.exists(tmp_archive) or os.path.getsize(tmp_archive) <= 0:550 # create temporary placeholder file, to avoid duplicate parallel downloads551 save_file(tmp_archive, "")552 for i in range(retries + 1):553 try:554 download(archive_url, tmp_archive)555 break556 except Exception:557 time.sleep(sleep)558 if ext == ".zip":559 unzip(tmp_archive, target_dir)560 elif ext in [".bz2", ".gz", ".tgz"]:561 untar(tmp_archive, target_dir)562 else:563 raise Exception(f"Unsupported archive format: {ext}")564def download_and_extract_with_retry(archive_url, tmp_archive, target_dir):565 try:566 download_and_extract(archive_url, target_dir, tmp_archive=tmp_archive)567 except Exception as e:568 # try deleting and re-downloading the zip file569 LOG.info("Unable to extract file, re-downloading ZIP archive %s: %s", tmp_archive, e)570 rm_rf(tmp_archive)571 download_and_extract(archive_url, target_dir, tmp_archive=tmp_archive)572# kept here for backwards compatibility (installed on "make init" - TODO should be removed)573installers = {574 "cloudformation": install_cloudformation_libs,575 "dynamodb": install_dynamodb_local,576 "kinesis": install_kinesis,577 "kms": install_local_kms,578 "lambda": install_lambda_runtime,579 "sqs": install_sqs_provider,580 "stepfunctions": install_stepfunctions_local,581}582Installer = Tuple[str, Callable]583class InstallerRepository(Plugin):584 namespace = "localstack.installer"585 def get_installer(self) -> List[Installer]:586 raise NotImplementedError587class CommunityInstallerRepository(InstallerRepository):588 name = "community"589 def get_installer(self) -> List[Installer]:590 return [591 ("awslamba-go-runtime", install_go_lambda_runtime),592 ("awslambda-runtime", install_lambda_runtime),593 ("cloudformation-libs", install_cloudformation_libs),594 ("dynamodb-local", install_dynamodb_local),595 ("elasticmq", install_elasticmq),596 ("elasticsearch", install_elasticsearch),597 ("opensearch", install_opensearch),598 ("kinesalite", install_kinesalite),599 ("kinesis-client-libs", install_amazon_kinesis_client_libs),600 ("kinesis-mock", install_kinesis_mock),601 ("lambda-java-libs", install_lambda_java_libs),602 ("local-kms", install_local_kms),603 ("stepfunctions-local", install_stepfunctions_local),604 ("terraform", install_terraform),605 ]606class InstallerManager:607 def __init__(self):608 self.repositories: PluginManager[InstallerRepository] = PluginManager(609 InstallerRepository.namespace610 )611 @functools.lru_cache()612 def get_installers(self) -> Dict[str, Callable]:613 installer: List[Installer] = []614 for repo in self.repositories.load_all():615 installer.extend(repo.get_installer())616 return dict(installer)617 def install(self, package: str, *args, **kwargs):618 installer = self.get_installers().get(package)619 if not installer:620 raise ValueError("no installer for package %s" % package)621 return installer(*args, **kwargs)622def main():623 if len(sys.argv) > 1:624 # set test API key so pro install hooks are called625 os.environ["LOCALSTACK_API_KEY"] = os.environ.get("LOCALSTACK_API_KEY") or "test"626 if sys.argv[1] == "libs":627 print("Initializing installation.")628 logging.basicConfig(level=logging.INFO)629 logging.getLogger("requests").setLevel(logging.WARNING)630 install_all_components()631 if sys.argv[1] in ("libs", "testlibs"):632 # Install additional libraries for testing633 install_amazon_kinesis_client_libs()634 install_lambda_java_testlibs()635 print("Done.")636if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful