Best Python code snippet using localstack_python
install.py
Source:install.py  
...459    # install LocalStack "fat" JAR file (contains all dependencies)460    if not os.path.exists(INSTALL_PATH_LOCALSTACK_FAT_JAR):461        log_install_msg("LocalStack Java libraries", verbatim=True)462        download(URL_LOCALSTACK_FAT_JAR, INSTALL_PATH_LOCALSTACK_FAT_JAR)463def install_lambda_java_testlibs():464    # Download the LocalStack Utils Test jar file from the maven repo465    if not os.path.exists(TEST_LAMBDA_JAVA):466        mkdir(os.path.dirname(TEST_LAMBDA_JAVA))467        download(TEST_LAMBDA_JAR_URL, TEST_LAMBDA_JAVA)468def install_go_lambda_runtime():469    if os.path.isfile(GO_LAMBDA_RUNTIME):470        return471    log_install_msg("Installing golang runtime")472    system = platform.system().lower()473    arch = get_arch()474    if system not in ["linux"]:475        raise ValueError("unsupported os %s for awslambda-go-runtime" % system)476    if arch not in ["amd64", "arm64"]:477        raise ValueError("unsupported arch %s for awslambda-go-runtime" % arch)478    url = GO_RUNTIME_DOWNLOAD_URL_TEMPLATE.format(479        version=GO_RUNTIME_VERSION,480        os=system,481        arch=arch,482    )483    download_and_extract(url, GO_INSTALL_FOLDER)484    st = os.stat(GO_LAMBDA_RUNTIME)485    os.chmod(GO_LAMBDA_RUNTIME, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)486    st = os.stat(GO_LAMBDA_MOCKSERVER)487    os.chmod(GO_LAMBDA_MOCKSERVER, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)488def install_lambda_runtime():489    if os.path.isfile(LAMBDA_RUNTIME_INIT_PATH):490        return491    log_install_msg("Installing lambda runtime")492    arch = get_arch()493    arch = "x86_64" if arch == "amd64" else arch494    download_url = LAMBDA_RUNTIME_INIT_URL.format(arch=arch)495    download(download_url, LAMBDA_RUNTIME_INIT_PATH)496    st = os.stat(LAMBDA_RUNTIME_INIT_PATH)497    os.chmod(LAMBDA_RUNTIME_INIT_PATH, mode=st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)498def install_cloudformation_libs():499    from localstack.services.cloudformation import deployment_utils500    # trigger download of CF module file501    deployment_utils.get_cfn_response_mod_file()502def install_terraform() -> str:503    if os.path.isfile(TERRAFORM_BIN):504        return TERRAFORM_BIN505    log_install_msg(f"Installing terraform {TERRAFORM_VERSION}")506    system = platform.system().lower()507    arch = get_arch()508    url = TERRAFORM_URL_TEMPLATE.format(version=TERRAFORM_VERSION, os=system, arch=arch)509    download_and_extract(url, os.path.dirname(TERRAFORM_BIN))510    chmod_r(TERRAFORM_BIN, 0o777)511    return TERRAFORM_BIN512def get_terraform_binary() -> str:513    if not os.path.isfile(TERRAFORM_BIN):514        install_terraform()515    return TERRAFORM_BIN516def install_component(name):517    installer = installers.get(name)518    if installer:519        installer()520def install_components(names):521    parallelize(install_component, names)522    install_lambda_java_libs()523def install_all_components():524    # install dependencies - make sure that install_components(..) is called before hooks.install below!525    install_components(DEFAULT_SERVICE_PORTS.keys())526    hooks.install.run()527def install_debugpy_and_dependencies():528    try:529        import debugpy530        assert debugpy531        logging.debug("Debugpy module already Installed")532    except ModuleNotFoundError:533        logging.debug("Installing Debugpy module")534        import pip535        if hasattr(pip, "main"):536            pip.main(["install", DEBUGPY_MODULE])537        else:538            pip._internal.main(["install", DEBUGPY_MODULE])539# -----------------540# HELPER FUNCTIONS541# -----------------542def log_install_msg(component, verbatim=False):543    component = component if verbatim else "local %s server" % component544    LOG.info("Downloading and installing %s. This may take some time.", component)545def download_and_extract(archive_url, target_dir, retries=0, sleep=3, tmp_archive=None):546    mkdir(target_dir)547    _, ext = os.path.splitext(tmp_archive or archive_url)548    tmp_archive = tmp_archive or new_tmp_file()549    if not os.path.exists(tmp_archive) or os.path.getsize(tmp_archive) <= 0:550        # create temporary placeholder file, to avoid duplicate parallel downloads551        save_file(tmp_archive, "")552        for i in range(retries + 1):553            try:554                download(archive_url, tmp_archive)555                break556            except Exception:557                time.sleep(sleep)558    if ext == ".zip":559        unzip(tmp_archive, target_dir)560    elif ext in [".bz2", ".gz", ".tgz"]:561        untar(tmp_archive, target_dir)562    else:563        raise Exception(f"Unsupported archive format: {ext}")564def download_and_extract_with_retry(archive_url, tmp_archive, target_dir):565    try:566        download_and_extract(archive_url, target_dir, tmp_archive=tmp_archive)567    except Exception as e:568        # try deleting and re-downloading the zip file569        LOG.info("Unable to extract file, re-downloading ZIP archive %s: %s", tmp_archive, e)570        rm_rf(tmp_archive)571        download_and_extract(archive_url, target_dir, tmp_archive=tmp_archive)572# kept here for backwards compatibility (installed on "make init" - TODO should be removed)573installers = {574    "cloudformation": install_cloudformation_libs,575    "dynamodb": install_dynamodb_local,576    "kinesis": install_kinesis,577    "kms": install_local_kms,578    "lambda": install_lambda_runtime,579    "sqs": install_sqs_provider,580    "stepfunctions": install_stepfunctions_local,581}582Installer = Tuple[str, Callable]583class InstallerRepository(Plugin):584    namespace = "localstack.installer"585    def get_installer(self) -> List[Installer]:586        raise NotImplementedError587class CommunityInstallerRepository(InstallerRepository):588    name = "community"589    def get_installer(self) -> List[Installer]:590        return [591            ("awslamba-go-runtime", install_go_lambda_runtime),592            ("awslambda-runtime", install_lambda_runtime),593            ("cloudformation-libs", install_cloudformation_libs),594            ("dynamodb-local", install_dynamodb_local),595            ("elasticmq", install_elasticmq),596            ("elasticsearch", install_elasticsearch),597            ("opensearch", install_opensearch),598            ("kinesalite", install_kinesalite),599            ("kinesis-client-libs", install_amazon_kinesis_client_libs),600            ("kinesis-mock", install_kinesis_mock),601            ("lambda-java-libs", install_lambda_java_libs),602            ("local-kms", install_local_kms),603            ("stepfunctions-local", install_stepfunctions_local),604            ("terraform", install_terraform),605        ]606class InstallerManager:607    def __init__(self):608        self.repositories: PluginManager[InstallerRepository] = PluginManager(609            InstallerRepository.namespace610        )611    @functools.lru_cache()612    def get_installers(self) -> Dict[str, Callable]:613        installer: List[Installer] = []614        for repo in self.repositories.load_all():615            installer.extend(repo.get_installer())616        return dict(installer)617    def install(self, package: str, *args, **kwargs):618        installer = self.get_installers().get(package)619        if not installer:620            raise ValueError("no installer for package %s" % package)621        return installer(*args, **kwargs)622def main():623    if len(sys.argv) > 1:624        # set test API key so pro install hooks are called625        os.environ["LOCALSTACK_API_KEY"] = os.environ.get("LOCALSTACK_API_KEY") or "test"626        if sys.argv[1] == "libs":627            print("Initializing installation.")628            logging.basicConfig(level=logging.INFO)629            logging.getLogger("requests").setLevel(logging.WARNING)630            install_all_components()631        if sys.argv[1] in ("libs", "testlibs"):632            # Install additional libraries for testing633            install_amazon_kinesis_client_libs()634            install_lambda_java_testlibs()635        print("Done.")636if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
