Best Python code snippet using localstack_python
provision.py
Source:provision.py  
...137            "IAM role added to cluster. Nothing to do.",138        )139    else:140        logging.info("Add IAM role to cluster required.")141        redshift_client.modify_cluster_iam_roles(142            ClusterIdentifier=cluster, AddIamRoles=[role]143        )144        waiter = redshift_client.get_waiter("cluster_available")145        waiter.wait(ClusterIdentifier=cluster)146@retry_aws(codes=["InvalidClusterState"])147def ensure_logging_enabled(cluster, configureS3Logging, bucket):148    logging_status = redshift_client.describe_logging_status(149        ClusterIdentifier=cluster,150    )151    logging.info("Logging status: %s", logging_status)152    if logging_status["LoggingEnabled"]:  # eg. user already configured s3 logging153        if "BucketName" in logging_status:  # eg. use custom s3 bucket active154            logging_bucket = logging_status["BucketName"]155        else:  # eg. user have CloudWatch as destination activated156            raise DataException(157                "Configure S3 logging failed. Another destination of logging active."158            )159    elif configureS3Logging:160        logging.info("Enable logging required.")161        redshift_client.enable_logging(162            ClusterIdentifier=cluster,163            BucketName=bucket,164            S3KeyPrefix=f"redshift-logs/{cluster}",165        )166        waiter = redshift_client.get_waiter("cluster_available")167        waiter.wait(ClusterIdentifier=cluster)168        logging_bucket = bucket169    else:170        raise DataException(171            "Configure logging failed."172            "Setup logging to S3 must be accepted in CloudFormation or enable logging manually."173        )174    return logging_bucket175@retry_aws(codes=["InvalidClusterParameterGroupState"])176def ensure_custom_parameter_group(cluster, configureS3Logging):177    cluster_description = redshift_client.describe_clusters(ClusterIdentifier=cluster)[178        "Clusters"179    ][0]180    parameter_group_name = cluster_description["ClusterParameterGroups"][0][181        "ParameterGroupName"182    ]183    logging.info("Current parameter group name: %s", parameter_group_name)184    if not parameter_group_name.startswith("default."):185        logging.info(186            "Custom parameter group used. Nothing to do.",187        )188    elif configureS3Logging:189        logging.info("Create a new parameter group required.")190        parameter_group = redshift_client.describe_cluster_parameter_groups(191            ParameterGroupName=parameter_group_name192        )["ParameterGroups"][0]193        custom_parameter_group = f"redshift-custom-{cluster}"194        redshift_client.create_cluster_parameter_group(195            ParameterGroupName=custom_parameter_group,196            ParameterGroupFamily=parameter_group["ParameterGroupFamily"],197            Description="Created by CloudFormation on provisioning Select Star",198        )199        logging.info("Custom parameter group created: %s", custom_parameter_group)200        redshift_client.modify_cluster(201            ClusterIdentifier=cluster,202            ClusterParameterGroupName=custom_parameter_group,203        )204        logging.info("Custom parameter set for cluster: %s", custom_parameter_group)205        waiter = redshift_client.get_waiter("cluster_available")206        waiter.wait(ClusterIdentifier=cluster)207    else:208        raise DataException(209            "Configure logging failed."210            "Setup logging to S3 must be accepted in CloudFormation or custom parameter group set manually."211        )212@retry_aws(codes=["InvalidClusterParameterGroupState"])213def ensure_user_activity_enabled(cluster, configureS3Logging):214    cluster_description = redshift_client.describe_clusters(ClusterIdentifier=cluster)[215        "Clusters"216    ][0]217    parameter_group = cluster_description["ClusterParameterGroups"][0][218        "ParameterGroupName"219    ]220    logging.info("Parameter group: %s", parameter_group)221    paginator = redshift_client.get_paginator("describe_cluster_parameters")222    enabled = any(223        parameter["ParameterName"] == USER_ACTIVITY224        and parameter["ParameterValue"] == "true"225        for resp in paginator.paginate(ParameterGroupName=parameter_group)226        for parameter in resp["Parameters"]227    )228    if enabled:229        logging.info(230            "User activity enabled. Nothing to do.",231        )232    elif configureS3Logging:233        redshift_client.modify_cluster_parameter_group(234            ParameterGroupName=parameter_group,235            Parameters=[236                {237                    "ParameterName": USER_ACTIVITY,238                    "ParameterValue": "true",239                }240            ],241        )242        logging.info("Parameter group updated to set parameter: %s", USER_ACTIVITY)243        waiter = redshift_client.get_waiter("cluster_available")244        waiter.wait(ClusterIdentifier=cluster)245    else:246        raise DataException(247            "Configure logging failed."248            f"Setup logging to S3 must be accepted in CloudFormation or parameter '{USER_ACTIVITY}' enabled manually."249        )250def ensure_cluster_restarted(cluster, configureS3LoggingRestart):251    cluster_description = redshift_client.describe_clusters(ClusterIdentifier=cluster)[252        "Clusters"253    ][0]254    pending_reboot = any(255        param["ParameterName"] == USER_ACTIVITY256        and param["ParameterApplyStatus"] == "pending-reboot"257        for group in cluster_description["ClusterParameterGroups"]258        for param in group["ClusterParameterStatusList"]259    )260    if not pending_reboot:261        logging.info(262            "No pending modifications. Nothing to do.",263        )264    elif configureS3LoggingRestart:265        logging.info(266            "Cluster requires reboot.",267        )268        redshift_client.reboot_cluster(ClusterIdentifier=cluster)269        logging.info(270            "Cluster rebooted. Waiting to start.",271        )272        waiter = redshift_client.get_waiter("cluster_available")273        waiter.wait(ClusterIdentifier=cluster)274        logging.info("Cluster started after reboot.")275    else:276        logging.warn(277            "Pending modifications. They will probably be applied during the next maintenance window.",278        )279def fetch_databases(cluster, db, dbUser):280    for resp in redshiftdata_client.get_paginator("list_databases").paginate(281        ClusterIdentifier=cluster,282        Database=db,283        DbUser=dbUser,284    ):285        yield from resp["Databases"]286def handler(event, context):287    logger.info(json.dumps(event))288    try:289        properties = event["ResourceProperties"]290        role = properties["RedshiftRole"]291        cluster = properties["Cluster"]292        bucket = properties.get("Bucket", None)293        db = properties["Db"]294        grant = properties["Grant"]295        dbUser = properties["DbUser"]296        configureS3Logging = properties["ConfigureS3Logging"] == "true"297        configureS3LoggingRestart = properties["ConfigureS3LoggingRestart"] == "true"298        ensure_cluster_state(cluster)299        if "*" in grant:300            grant = list(fetch_databases(cluster, db, dbUser))301            logger.info("Resolved '*' in grant to: %s", grant)302        if event["RequestType"] == "Delete":303            try:304                for dbname in grant:305                    for table in TABLES:306                        execQuery(307                            cluster,308                            dbname,309                            dbUser,310                            f"revoke all on {table} from selectstar;",311                        )312                execQuery(cluster, db, dbUser, "drop user selectstar;")313            except Exception as e:314                logging.warn("User could not be removed")315            try:316                redshift_client.modify_cluster_iam_roles(317                    ClusterIdentifier=cluster, RemoveIamRoles=[role]318                )319                logging.info("Cluster IAM role removed: %s", role)320                waiter = redshift_client.get_waiter("cluster_available")321                waiter.wait(ClusterIdentifier=cluster)322            except Exception as e:323                logging.warn("Role could not be removed")324            cfnresponse.send(325                event, context, cfnresponse.SUCCESS, {"Data": "Delete complete"}326            )327        else:328            security_group_id, endpoint_port = ensure_valid_cluster(cluster)329            logging.info("Äluster validated successfully")330            ensure_iam_role(cluster, role)...main.py
Source:main.py  
...7		'redshift',8		aws_access_key_id=access_key,9		aws_secret_access_key=access_secret10	)11	return redshift_client.modify_cluster_iam_roles(12		ClusterIdentifier=cluster_identifier,13		AddIamRoles=[14			role_arn15		]16	)17if __name__ == '__main__':18	print('Begin attach_role_to_cluster')19	access_key, access_secret, region, cluster_identifier, role_arn = sys.argv[1:]20	res = attach_role_to_cluster(access_key, access_secret, region, cluster_identifier, role_arn)21	if res['ResponseMetadata']['HTTPStatusCode'] != 200:22		print('attach_role_to_cluster failed with response:', res)23		raise SystemExit(1)24	else:25		print('attach_role_to_cluster succeeded with response:', res)Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
