Best Python code snippet using localstack_python
install.py
Source:install.py  
...28        log_install_msg('Elasticsearch')29        mkdir(INSTALL_DIR_INFRA)30        # download and extract archive31        tmp_archive = os.path.join(tempfile.gettempdir(), 'localstack.es.zip')32        download_and_extract_with_retry(ELASTICSEARCH_JAR_URL, tmp_archive, INSTALL_DIR_INFRA)33        elasticsearch_dir = glob.glob(os.path.join(INSTALL_DIR_INFRA, 'elasticsearch*'))34        if not elasticsearch_dir:35            raise Exception('Unable to find Elasticsearch folder in %s' % INSTALL_DIR_INFRA)36        shutil.move(elasticsearch_dir[0], INSTALL_DIR_ES)37        for dir_name in ('data', 'logs', 'modules', 'plugins', 'config/scripts'):38            dir_path = '%s/%s' % (INSTALL_DIR_ES, dir_name)39            mkdir(dir_path)40            chmod_r(dir_path, 0o777)41        # install default plugins42        for plugin in ELASTICSEARCH_PLUGIN_LIST:43            if is_alpine():44                # https://github.com/pires/docker-elasticsearch/issues/5645                os.environ['ES_TMPDIR'] = '/tmp'46            plugin_binary = os.path.join(INSTALL_DIR_ES, 'bin', 'elasticsearch-plugin')47            run('%s install %s' % (plugin_binary, plugin))48def install_elasticmq():49    if not os.path.exists(INSTALL_DIR_ELASTICMQ):50        log_install_msg('ElasticMQ')51        mkdir(INSTALL_DIR_ELASTICMQ)52        # download archive53        tmp_archive = os.path.join(tempfile.gettempdir(), 'elasticmq-server.jar')54        if not os.path.exists(tmp_archive):55            download(ELASTICMQ_JAR_URL, tmp_archive)56        shutil.copy(tmp_archive, INSTALL_DIR_ELASTICMQ)57def install_kinesalite():58    target_dir = '%s/kinesalite' % INSTALL_DIR_NPM59    if not os.path.exists(target_dir):60        log_install_msg('Kinesis')61        run('cd "%s" && npm install' % ROOT_PATH)62def install_stepfunctions_local():63    if not os.path.exists(INSTALL_DIR_STEPFUNCTIONS):64        log_install_msg('Step Functions')65        tmp_archive = os.path.join(tempfile.gettempdir(), 'stepfunctions.zip')66        download_and_extract_with_retry(67            STEPFUNCTIONS_ZIP_URL, tmp_archive, INSTALL_DIR_STEPFUNCTIONS)68def install_dynamodb_local():69    if not os.path.exists(INSTALL_DIR_DDB):70        log_install_msg('DynamoDB')71        # download and extract archive72        tmp_archive = os.path.join(tempfile.gettempdir(), 'localstack.ddb.zip')73        download_and_extract_with_retry(DYNAMODB_JAR_URL, tmp_archive, INSTALL_DIR_DDB)74    # fix for Alpine, otherwise DynamoDBLocal fails with:75    # DynamoDBLocal_lib/libsqlite4java-linux-amd64.so: __memcpy_chk: symbol not found76    if is_alpine():77        ddb_libs_dir = '%s/DynamoDBLocal_lib' % INSTALL_DIR_DDB78        patched_marker = '%s/alpine_fix_applied' % ddb_libs_dir79        if not os.path.exists(patched_marker):80            patched_lib = ('https://rawgit.com/bhuisgen/docker-alpine/master/alpine-dynamodb/' +81                'rootfs/usr/local/dynamodb/DynamoDBLocal_lib/libsqlite4java-linux-amd64.so')82            patched_jar = ('https://rawgit.com/bhuisgen/docker-alpine/master/alpine-dynamodb/' +83                'rootfs/usr/local/dynamodb/DynamoDBLocal_lib/sqlite4java.jar')84            run("curl -L -o %s/libsqlite4java-linux-amd64.so '%s'" % (ddb_libs_dir, patched_lib))85            run("curl -L -o %s/sqlite4java.jar '%s'" % (ddb_libs_dir, patched_jar))86            save_file(patched_marker, '')87    # fix logging configuration for DynamoDBLocal88    log4j2_config = """<Configuration status="WARN">89      <Appenders>90        <Console name="Console" target="SYSTEM_OUT">91          <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>92        </Console>93      </Appenders>94      <Loggers>95        <Root level="WARN"><AppenderRef ref="Console"/></Root>96      </Loggers>97    </Configuration>"""98    log4j2_file = os.path.join(INSTALL_DIR_DDB, 'log4j2.xml')99    save_file(log4j2_file, log4j2_config)100    run('cd "%s" && zip -u DynamoDBLocal.jar log4j2.xml || true' % INSTALL_DIR_DDB)101def install_amazon_kinesis_client_libs():102    # install KCL/STS JAR files103    if not os.path.exists(INSTALL_DIR_KCL):104        mkdir(INSTALL_DIR_KCL)105        tmp_archive = os.path.join(tempfile.gettempdir(), 'aws-java-sdk-sts.jar')106        if not os.path.exists(tmp_archive):107            download(STS_JAR_URL, tmp_archive)108        shutil.copy(tmp_archive, INSTALL_DIR_KCL)109    # Compile Java files110    from localstack.utils.kinesis import kclipy_helper111    classpath = kclipy_helper.get_kcl_classpath()112    java_files = '%s/utils/kinesis/java/com/atlassian/*.java' % ROOT_PATH113    class_files = '%s/utils/kinesis/java/com/atlassian/*.class' % ROOT_PATH114    if not glob.glob(class_files):115        run('javac -cp "%s" %s' % (classpath, java_files))116def install_lambda_java_libs():117    # install LocalStack "fat" JAR file (contains all dependencies)118    if not os.path.exists(INSTALL_PATH_LOCALSTACK_FAT_JAR):119        log_install_msg('LocalStack Java libraries', verbatim=True)120        download(URL_LOCALSTACK_FAT_JAR, INSTALL_PATH_LOCALSTACK_FAT_JAR)121def install_component(name):122    installers = {123        'kinesis': install_kinesalite,124        'dynamodb': install_dynamodb_local,125        'es': install_elasticsearch,126        'sqs': install_elasticmq,127        'stepfunctions': install_stepfunctions_local128    }129    installer = installers.get(name)130    if installer:131        installer()132def install_components(names):133    parallelize(install_component, names)134    install_lambda_java_libs()135def install_all_components():136    install_components(DEFAULT_SERVICE_PORTS.keys())137# -----------------138# HELPER FUNCTIONS139# -----------------140def log_install_msg(component, verbatim=False):141    component = component if verbatim else 'local %s server' % component142    LOGGER.info('Downloading and installing %s. This may take some time.' % component)143def is_alpine():144    try:145        run('cat /etc/issue | grep Alpine', print_error=False)146        return True147    except Exception:148        return False149def download_and_extract_with_retry(archive_url, tmp_archive, target_dir):150    mkdir(target_dir)151    def download_and_extract():152        if not os.path.exists(tmp_archive):153            download(archive_url, tmp_archive)154        unzip(tmp_archive, target_dir)155    try:156        download_and_extract()157    except Exception:158        # try deleting and re-downloading the zip file159        LOGGER.info('Unable to extract file, re-downloading ZIP archive: %s' % tmp_archive)160        rm_rf(tmp_archive)161        download_and_extract()162if __name__ == '__main__':163    if len(sys.argv) > 1:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
