Best Python code snippet using localstack_python
cli.py
Source:cli.py  
...30#31# - ENCODED_ES_SERVER32#   Get this from the endpoint property of the AWS ElasticSearch instance definition33#   named es-{aws_credentials_name} where aws_credentials_name is e.g. cgap-supertest.34#   Ref: AwsFunctions.get_opensearch_endpoint()35#   TODO: Since this value is generated by the datastore stack provisioning, which also36#   sets up the GAC, why not use the same code to set this at datastore provisioning time?37#   TODO: Actually it looks like this value IS set during datastore stack provisioning time,38#   to e.g. vpc-es-cgap-supertest-asggiedgb6ilmzjuq4hlwgw2w4.us-east-1.es.amazonaws.com,39#   but without the ":443" port suffix, in application_configuration_template() in40#   datastore.py via call to C4DatastoreExports.get_es_url() in datastore.py.41#   TODO: I have a note in my docs that this cannot actualy be set until ES comes online. Hmm.42#43# - RDS_HOSTNAME, RDS_PASSWORD44#   Get these from the "host" and "password" secret key values in the secret name45#   ending in "RDSSecret" in the AWS Secrets Manager.46#   TODO: The "RDSSecret" secret name is from rds_secret_logical_id in C4Datastore in datastore.py;47#   FACTORED OUT rds_secret_logical_id from C4Datastore into names.py for this (DONE).48#   TODO: The "password" string is from rds_secret() in C4Datastore in datastore.py;49#   not sure where "host" is from.50#   Note that the value of the password seems to be generated by AWS via GenerateSecretString.51#52# - ENCODED_S3_ENCRYPT_KEY_ID53#   Get this from the (one single) customer manager key the AWS Key Management Service (KMS).54#   Only set this if "s3.bucket.encryption" is True in custom/config.json.55#   Ref: AwsFunctions.get_customer_managed_kms_keys()56#57# - S3_AWS_ACCESS_KEY_ID, S3_AWS_SECRET_ACCESS_KEY58#   Get these by creating AWS security access key pair for the "federated" IAM user.59#   TODO: Get federated IAM user name (e.g. c4-iam-main-stack-C4IAMMainApplicationS3Federator-ZFK91VU2DM1H)60#   from AWS IAM user whose name contains "ApplicationS3Federator" which (that string)61#   is referenced/hardcoded in ecs_s3_iam_user() in iam.py.62#   Ref: AwsFunctions.create_user_access_key()63#64# - S3_ENCRYPT_KEY65#   This gets set automatically it seems.66#   TODO: Though originally this did not seem to be the case.67#   Get (if not already set) from custom/aws_creds/s3_encrypt_key.txt68import argparse69import boto370import contextlib71import io72import json73import os74import re75# NOTE: This imports dcicutils.cloudformation_utils which (ultimately) instantiates a boto376# client globally which causes a boto3.DEFAULT_SESSION to be cached, with incorrect credentials,77# which messes up our AwsContext; workaround is to set boto3.DEFAULT_SESSION to None in AwsContext.78from dcicutils.misc_utils import PRINT79from ...names import Names80from ..init_custom_dir.defs import (InfraDirectories, InfraFiles)81from .aws_functions import AwsFunctions82from .utils import (obfuscate, should_obfuscate)83def get_custom_dir(custom_dir: str = None):84    return InfraDirectories.get_custom_dir(custom_dir)85def get_custom_aws_creds_dir(custom_dir: str = None):86    return InfraDirectories.get_custom_aws_creds_dir(custom_dir)87def get_custom_config_file(custom_dir: str = None):88    return InfraFiles.get_config_file(custom_dir)89def get_custom_config_file_value(custom_dir: str, name: str):90    custom_config_file = get_custom_config_file(custom_dir)91    with io.open(custom_config_file, "r") as custom_config_fp:92        custom_config_json = json.load(custom_config_fp)93        return custom_config_json.get(name)94    return None95def get_aws_credentials_name(custom_dir: str = None) -> str:96    return get_custom_config_file_value(custom_dir, "ENCODED_ENV_NAME")97def get_account_number_from_config_file(custom_dir: str = None) -> str:98    return get_custom_config_file_value(custom_dir, "account_number")99def get_s3_bucket_encryption_from_config_file(custom_dir: str = None) -> bool:100    return get_custom_config_file_value(custom_dir, "s3.bucket.encryption")101def get_global_application_secret_name(aws_credentials_name: str) -> str:102    """103    Obtains/returns the 'identity', i.e. the global application configuration secret name using104    the same code that 4dn-cloud-infra code does (see C4Datastore.application_configuration_secret).105    Had to do some refactoring to get this working (see names.py).106    :param aws_credentials_name: AWS credentials name (e.g. cgap-supertest).107    :return: Identity (global application configuration name) as gotten from the main 4dn-cloud-infra code.108    """109    try:110        global_application_secret_name = Names.application_configuration_secret(aws_credentials_name)111    except Exception:112        global_application_secret_name = None113    return global_application_secret_name114def get_rds_secret_name(aws_credentials_name: str) -> str:115    """116    Obtains/returns the RDS secret name using the same code that 4dn-cloud-infra code117    does (see C4Datastore.rds_secret_logical_id).118    Had to do some refactoring to get this working (see names.py).119    :param aws_credentials_name: AWS credentials name (e.g. cgap-supertest).120    :return: RDS secret name as gotten from the main 4dn-cloud-infra code.121    """122    try:123        rds_secret_name = Names.rds_secret_logical_id(aws_credentials_name)124    except Exception:125        rds_secret_name = None126    return rds_secret_name127def main():128    args_parser = argparse.ArgumentParser()129    args_parser.add_argument("--custom-dir", type=str, required=False, default=InfraDirectories.CUSTOM_DIR)130    args_parser.add_argument("--access-key", type=str, required=False)131    args_parser.add_argument("--secret-key", type=str, required=False)132    args_parser.add_argument("--region", type=str, required=False)133    args_parser.add_argument("--credentials-dir", type=str, required=False)134    args_parser.add_argument("--identity", type=str, required=False)135    args_parser.add_argument("--federated-user", type=str, required=False)136    args_parser.add_argument("--show", action="store_true", required=False)137    args = args_parser.parse_args()138    # Intialize the dictionary secrets to set, which we will collect here.139    secrets_to_update = {}140    # Gather the basic info.141    custom_dir = get_custom_dir(args.custom_dir)142    custom_aws_creds_dir = args.credentials_dir if args.credentials_dir else get_custom_aws_creds_dir(custom_dir)143    custom_config_file = get_custom_config_file(custom_dir)144    aws_credentials_name = get_aws_credentials_name(custom_dir)145    # Get AWS credentials context object.146    aws = AwsFunctions(custom_aws_creds_dir, args.access_key, args.secret_key, args.region)147    # Get the relevant AWS secret names.148    global_application_secret_name = args.identity if args.identity else get_global_application_secret_name(aws_credentials_name)149    rds_secret_name = get_rds_secret_name(aws_credentials_name)150    # Get the "identity" name, i.e. the global application confguration secret name.151    secrets_to_update["ENCODED_IDENTITY"] = global_application_secret_name152    PRINT(f"Setting up 4dn-cloud-infra remaining AWS secrets for: {global_application_secret_name}")153    PRINT(f"Your custom directory: {custom_dir}")154    PRINT(f"Your custom config file: {custom_config_file}")155    PRINT(f"Your AWS credentials name: {aws_credentials_name}")156    # TODO: If access key and secrets key specified via command-line should also require region?157    # i.e. so we're not split with some values from command-line and some from AWS credentials/config files?158    if not args.access_key or not args.secret_key:159        custom_aws_creds_dir_symlink_target = os.readlink(custom_aws_creds_dir) if os.path.islink(custom_aws_creds_dir) else None160        if custom_aws_creds_dir_symlink_target:161            PRINT(f"Your AWS credentials directory (link): {custom_aws_creds_dir}@ ->")162            PRINT(f"Your AWS credentials directory (real): {custom_aws_creds_dir_symlink_target}")163        else:164            PRINT(f"Your AWS credentials directory: {custom_aws_creds_dir}")165    # Get the AWS ACCOUNT_NUMBER value from the custom/config.json file.166    account_number = get_account_number_from_config_file(custom_dir)167    PRINT(f"Your AWS account number: {account_number}")168    # Verify the AWS credentials context and get the associated ACCOUNT_NUMBER value.169    # If ACCOUNT_NUMBER does not agree with what's in the config file (above) then warning (error?).170    with aws.establish_credentials() as credentials:171        PRINT(f"Your AWS access key: {credentials.access_key_id}")172        PRINT(f"Your AWS access secret: {credentials.secret_access_key if args.show else obfuscate(credentials.secret_access_key)}")173        PRINT(f"Your AWS default region: {credentials.default_region}")174        PRINT(f"Your AWS account number: {credentials.account_number}")175        PRINT(f"Your AWS account user ARN: {credentials.user_arn}")176        if account_number != credentials.account_number:177            # TODO: Should this be a hard error?178            PRINT(f"WARNING: Account number from your config file ({account_number}) does not match AWS ({credentials.account_number}).")179        secrets_to_update["ACCOUNT_NUMBER"] = credentials.account_number180    PRINT(f"AWS global application configuration secret name: {global_application_secret_name}")181    PRINT(f"AWS RDS application configuration secret name: {rds_secret_name}")182    # Get the IAM "federated" user name.183    if args.federated_user:184        federated_user_name = args.federated_user185    else:186        # TODO: get string from code.187        federated_user_name_pattern = "ApplicationS3Federator"188        federated_user_name = aws.find_iam_user_name(federated_user_name_pattern)189    if not federated_user_name:190        # TODO: Should this be a hard error?191        PRINT(f"ERROR: AWS federated user cannot be determined!")192    else:193        PRINT(f"AWS application federated IAM user: {federated_user_name}")194    # Get the ElasticSearch host/port.195    es_server = aws.get_opensearch_endpoint(aws_credentials_name)196    PRINT(f"AWS application ElasticSearch server: {es_server}")197    secrets_to_update["ENCODED_ES_SERVER"] = es_server198    # Get the RDS hostname and password.199    if not rds_secret_name:200        # TODO: Should this be a hard error?201        PRINT(f"ERROR: Cannot determine RDS secret name!")202    else:203        rds_hostname = aws.get_secret_value(rds_secret_name, "host")204        PRINT(f"AWS application RDS host name: {rds_hostname}")205        rds_password = aws.get_secret_value(rds_secret_name, "password")206        PRINT(f"AWS application RDS host password: {rds_password if args.show else obfuscate(rds_password)}")207        secrets_to_update["RDS_HOST"] = rds_hostname208        secrets_to_update["RDS_PASSWORD"] = rds_password209    # Get the ENCODED_S3_ENCRYPT_KEY_ID from KMS....aws_functions.py
Source:aws_functions.py  
...123                key_manager = key_metadata["KeyManager"]124                if key_manager == "CUSTOMER":125                    kms_keys.append(key_id)126        return kms_keys127    def get_opensearch_endpoint(self, aws_credentials_name: str):128        """129        Returns the endpoint (host:port) for the ElasticSearch instance associated130        with the given AWS credentials name (e.g. cgap-supertest).131        :param aws_credentials_name: AWS credentials name (e.g. cgap-supertest).132        :return: Endpoint (host:port) for ElasticSearch or None if not found.133        """134        with super().establish_credentials():135            # TODO: Get this name from somewhere in 4dn-cloud-infra.136            opensearch_instance_name = f"es-{aws_credentials_name}"137            opensearch = boto3.client('opensearch')138            domain_names = opensearch.list_domain_names()["DomainNames"]139            domain_name = [domain_name for domain_name in domain_names if domain_name["DomainName"] == opensearch_instance_name]140            if domain_name is None or len(domain_name) != 1:141                return None...aws_opensearch.py
Source:aws_opensearch.py  
...4# --------------------------------------------------------------------------------------------------5import argparse6import boto37from aws_utils import (validate_aws)8def get_opensearch_endpoint(aws_credentials_name: str,9                            access_key: str = None, secret_key: str = None, region: str = None):10    opensearch_instance_name = f"es-{aws_credentials_name}"11    opensearch = boto3.client('opensearch')12    domain_names = opensearch.list_domain_names()["DomainNames"]13    domain_name = [domain_name for domain_name in domain_names if domain_name["DomainName"] == opensearch_instance_name]14    if domain_name is None or len(domain_name) != 1:15        return None16    domain_name = domain_name[0]["DomainName"]17    domain_description = opensearch.describe_domain(DomainName=domain_name)18    domain_status = domain_description["DomainStatus"]19    domain_endpoints = domain_status["Endpoints"]20    domain_endpoint_options = domain_status["DomainEndpointOptions"]21    domain_endpoint_vpc = domain_endpoints["vpc"]22    domain_endpoint_https = domain_endpoint_options["EnforceHTTPS"]23    if domain_endpoint_https:24        domain_endpoint = f"{domain_endpoint_vpc}:443"25    else:26        domain_endpoint = f"{domain_endpoint_vpc}:80"27    return domain_endpoint28def main():29    args_parser = argparse.ArgumentParser()30    args_parser.add_argument("--name", type=str, required=True)31    args_parser.add_argument("--access-key", type=str, required=False)32    args_parser.add_argument("--secret-key", type=str, required=False)33    args_parser.add_argument("--region", type=str, required=False)34    args_parser.add_argument("--keys", action="store_true", required=False)35    args_parser.add_argument("--verbose", action="store_true", required=False)36    args = args_parser.parse_args()37    print(f"AWS OpenSearch Utility | {args.name}")38    access_key, secret_key, region = validate_aws(args.access_key, args.secret_key, args.region)39    endpoint = get_opensearch_endpoint(args.name, access_key, secret_key, region)40    print(endpoint)41if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
