How to use get_cluster_credentials_with_iam method in localstack

Best Python code snippet using localstack_python

client.pyi

Source:client.pyi Github

copy

Full Screen

...1341 to log on to an Amazon Redshift database.1342 [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/redshift.html#Redshift.Client.get_cluster_credentials)1343 [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_redshift/client.html#get_cluster_credentials)1344 """1345 def get_cluster_credentials_with_iam(1346 self, *, ClusterIdentifier: str, DbName: str = None, DurationSeconds: int = None1347 ) -> ClusterExtendedCredentialsTypeDef:1348 """1349 Returns a database user name and temporary password with temporary authorization1350 to log in to an Amazon Redshift database.1351 [Show boto3 documentation](https://boto3.amazonaws.com/v1/documentation/api/1.24.58/reference/services/redshift.html#Redshift.Client.get_cluster_credentials_with_iam)1352 [Show boto3-stubs documentation](https://vemel.github.io/boto3_stubs_docs/mypy_boto3_redshift/client.html#get_cluster_credentials_with_iam)1353 """1354 def get_reserved_node_exchange_configuration_options(1355 self,1356 *,1357 ActionType: ReservedNodeExchangeActionTypeType,1358 ClusterIdentifier: str = None,1359 SnapshotIdentifier: str = None,...

Full Screen

Full Screen

iam_helper.py

Source:iam_helper.py Github

copy

Full Screen

...35 Defines supported Python SDK methods used for Redshift credential retrieval36 """37 SERVERLESS_V1 = "get_credentials()"38 IAM_V1 = "get_cluster_credentials()"39 IAM_V2 = "get_cluster_credentials_with_iam()"40 @staticmethod41 def can_support_v2(provider_type: "IamHelper.IAMAuthenticationType") -> bool:42 """43 Determines if user provided connection options and boto3 version support group federation.44 """45 return (46 provider_type47 in (48 IamHelper.IAMAuthenticationType.PROFILE,49 IamHelper.IAMAuthenticationType.IAM_KEYS,50 IamHelper.IAMAuthenticationType.IAM_KEYS_WITH_SESSION,51 )52 ) and Version(pkg_resources.get_distribution("boto3").version) >= Version("1.24.5")53 credentials_cache: typing.Dict[str, dict] = {}54 @staticmethod55 def get_cluster_credentials_api_type(56 info: RedshiftProperty, provider_type: "IamHelper.IAMAuthenticationType"57 ) -> GetClusterCredentialsAPIType:58 """59 Returns an enum representing the Python SDK method to use for getting temporary IAM credentials.60 """61 if not info._is_serverless:62 if not info.group_federation:63 return IamHelper.GetClusterCredentialsAPIType.IAM_V164 elif IamHelper.GetClusterCredentialsAPIType.can_support_v2(provider_type):65 return IamHelper.GetClusterCredentialsAPIType.IAM_V266 else:67 raise InterfaceError("Authentication with plugin is not supported for group federation")68 elif not info.group_federation:69 return IamHelper.GetClusterCredentialsAPIType.SERVERLESS_V170 elif IamHelper.GetClusterCredentialsAPIType.can_support_v2(provider_type):71 return IamHelper.GetClusterCredentialsAPIType.IAM_V272 else:73 raise InterfaceError("Authentication with plugin is not supported for group federation")74 @staticmethod75 def set_iam_properties(info: RedshiftProperty) -> RedshiftProperty:76 """77 Helper function to handle connection properties and ensure required parameters are specified.78 Parameters79 """80 provider_type: IamHelper.IAMAuthenticationType = IamHelper.IAMAuthenticationType.NONE81 # set properties present for both IAM, Native authentication82 IamHelper.set_auth_properties(info)83 if info._is_serverless and info.iam:84 if Version(pkg_resources.get_distribution("boto3").version) < Version("1.24.11"):85 raise pkg_resources.VersionConflict(86 "boto3 >= 1.24.11 required for authentication with Amazon Redshift serverless. "87 "Please upgrade the installed version of boto3 to use this functionality."88 )89 if info.is_serverless_host:90 # consider overridden connection parameters91 if not info.region:92 info.set_region_from_host()93 if not info.serverless_acct_id:94 info.set_serverless_acct_id()95 if not info.serverless_work_group:96 info.set_serverless_work_group_from_host()97 if info.iam is True:98 if info.cluster_identifier is None and not info._is_serverless:99 raise InterfaceError(100 "Invalid connection property setting. cluster_identifier must be provided when IAM is enabled"101 )102 IamHelper.set_iam_credentials(info)103 # Check for Browser based OAuth Native authentication104 NativeAuthPluginHelper.set_native_auth_plugin_properties(info)105 return info106 @staticmethod107 def set_iam_credentials(info: RedshiftProperty) -> None:108 """109 Helper function to create the appropriate credential providers.110 """111 klass: typing.Optional[IPlugin] = None112 provider: typing.Union[IPlugin, AWSCredentialsProvider]113 if info.credentials_provider is not None:114 provider = IdpAuthHelper.load_credentials_provider(info)115 else: # indicates AWS Credentials will be used116 _logger.debug("AWS Credentials provider will be used for authentication")117 provider = AWSCredentialsProvider()118 provider.add_parameter(info)119 if isinstance(provider, SamlCredentialsProvider):120 credentials: CredentialsHolder = provider.get_credentials()121 metadata: CredentialsHolder.IamMetadata = credentials.get_metadata()122 if metadata is not None:123 auto_create: bool = metadata.get_auto_create()124 db_user: typing.Optional[str] = metadata.get_db_user()125 saml_db_user: typing.Optional[str] = metadata.get_saml_db_user()126 profile_db_user: typing.Optional[str] = metadata.get_profile_db_user()127 db_groups: typing.List[str] = metadata.get_db_groups()128 force_lowercase: bool = metadata.get_force_lowercase()129 allow_db_user_override: bool = metadata.get_allow_db_user_override()130 if auto_create is True:131 info.put("auto_create", auto_create)132 if force_lowercase is True:133 info.put("force_lowercase", force_lowercase)134 if allow_db_user_override is True:135 if saml_db_user is not None:136 info.put("db_user", saml_db_user)137 elif db_user is not None:138 info.put("db_user", db_user)139 elif profile_db_user is not None:140 info.put("db_user", profile_db_user)141 else:142 if db_user is not None:143 info.put("db_user", db_user)144 elif profile_db_user is not None:145 info.put("db_user", profile_db_user)146 elif saml_db_user is not None:147 info.put("db_user", saml_db_user)148 if (len(info.db_groups) == 0) and (len(db_groups) > 0):149 if force_lowercase:150 info.db_groups = [group.lower() for group in db_groups]151 else:152 info.db_groups = db_groups153 if not isinstance(provider, INativePlugin):154 IamHelper.set_cluster_credentials(provider, info)155 @staticmethod156 def get_credentials_cache_key(info: RedshiftProperty, cred_provider: typing.Union[IPlugin, AWSCredentialsProvider]):157 db_groups: str = ""158 if len(info.db_groups) > 0:159 info.put("db_groups", sorted(info.db_groups))160 db_groups = ",".join(info.db_groups)161 cred_key: str = ""162 if cred_provider:163 cred_key = str(cred_provider.get_cache_key())164 return ";".join(165 filter(166 None,167 (168 cred_key,169 typing.cast(str, info.db_user if info.db_user else info.user_name),170 info.db_name,171 db_groups,172 typing.cast(str, info.serverless_acct_id if info._is_serverless else info.cluster_identifier),173 typing.cast(174 str, info.serverless_work_group if info._is_serverless and info.serverless_work_group else ""175 ),176 str(info.auto_create),177 str(info.duration),178 # v2 api parameters179 info.preferred_role,180 info.web_identity_token,181 info.role_arn,182 info.role_session_name,183 # providers184 info.profile,185 info.access_key_id,186 info.secret_access_key,187 info.session_token,188 ),189 )190 )191 @staticmethod192 def get_authentication_type(193 provider: typing.Union[IPlugin, AWSCredentialsProvider]194 ) -> "IamHelper.IAMAuthenticationType":195 """196 Returns an enum representing the type of authentication the user is requesting based on connection parameters.197 """198 provider_type: IamHelper.IAMAuthenticationType = IamHelper.IAMAuthenticationType.NONE199 if isinstance(provider, IPlugin):200 provider_type = IamHelper.IAMAuthenticationType.PLUGIN201 elif isinstance(provider, AWSCredentialsProvider):202 if provider.profile is not None:203 provider_type = IamHelper.IAMAuthenticationType.PROFILE204 elif provider.session_token is not None:205 provider_type = IamHelper.IAMAuthenticationType.IAM_KEYS_WITH_SESSION206 else:207 provider_type = IamHelper.IAMAuthenticationType.IAM_KEYS208 return provider_type209 @staticmethod210 def set_cluster_credentials(211 cred_provider: typing.Union[IPlugin, AWSCredentialsProvider], info: RedshiftProperty212 ) -> None:213 """214 Calls the AWS SDK methods to return temporary credentials.215 The expiration date is returned as the local time set by the client machines OS.216 """217 import boto3 # type: ignore218 import botocore # type: ignore219 try:220 credentials_holder: typing.Union[221 CredentialsHolder, ABCAWSCredentialsHolder222 ] = cred_provider.get_credentials() # type: ignore223 session_credentials: typing.Dict[str, str] = credentials_holder.get_session_credentials()224 redshift_client: str = "redshift-serverless" if info._is_serverless else "redshift"225 _logger.debug("boto3.client(service_name={}) being used for IAM auth".format(redshift_client))226 for opt_key, opt_val in (("region_name", info.region), ("endpoint_url", info.endpoint_url)):227 if opt_val is not None:228 session_credentials[opt_key] = opt_val229 # if AWS credentials were used to create a boto3.Session object, use it230 if credentials_holder.has_associated_session:231 cached_session: boto3.Session = typing.cast(232 ABCAWSCredentialsHolder, credentials_holder233 ).get_boto_session()234 client = cached_session.client(service_name=redshift_client, region_name=info.region)235 else:236 client = boto3.client(service_name=redshift_client, **session_credentials)237 if info.host is None or info.host == "" or info.port is None or info.port == "":238 response: dict239 if info._is_serverless:240 if not info.serverless_work_group:241 raise InterfaceError("Serverless workgroup is not set.")242 response = client.get_workgroup(workgroupName=info.serverless_work_group)243 info.put("host", response["workgroup"]["endpoint"]["address"])244 info.put("port", response["workgroup"]["endpoint"]["port"])245 else:246 response = client.describe_clusters(ClusterIdentifier=info.cluster_identifier)247 info.put("host", response["Clusters"][0]["Endpoint"]["Address"])248 info.put("port", response["Clusters"][0]["Endpoint"]["Port"])249 cred: typing.Optional[typing.Dict[str, typing.Union[str, datetime.datetime]]] = None250 if info.iam_disable_cache is False:251 _logger.debug("iam_disable_cache=False")252 # temporary credentials are cached by redshift_connector and will be used if they have not expired253 cache_key: str = IamHelper.get_credentials_cache_key(info, cred_provider)254 cred = IamHelper.credentials_cache.get(cache_key, None)255 _logger.debug(256 "Searching credential cache for temporary AWS credentials. Found: {} Expiration: {}".format(257 bool(cache_key in IamHelper.credentials_cache),258 cred["Expiration"] if cred is not None else "N/A",259 )260 )261 if cred is None or typing.cast(datetime.datetime, cred["Expiration"]) < datetime.datetime.now(tz=tzutc()):262 # retries will occur by default ref:263 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html#legacy-retry-mode264 _logger.debug("Credentials expired or not found...requesting from boto")265 provider_type: IamHelper.IAMAuthenticationType = IamHelper.get_authentication_type(cred_provider)266 get_creds_api_version: IamHelper.GetClusterCredentialsAPIType = (267 IamHelper.get_cluster_credentials_api_type(info, provider_type)268 )269 _logger.debug("boto3 get_credentials api version: {} will be used".format(get_creds_api_version.value))270 if get_creds_api_version == IamHelper.GetClusterCredentialsAPIType.SERVERLESS_V1:271 get_cred_args: typing.Dict[str, str] = {"dbName": info.db_name}272 if info.serverless_work_group:273 get_cred_args["workgroupName"] = info.serverless_work_group274 cred = typing.cast(275 typing.Dict[str, typing.Union[str, datetime.datetime]],276 client.get_credentials(**get_cred_args),277 )278 # re-map expiration for compatibility with redshift credential response279 cred["Expiration"] = cred["expiration"]280 del cred["expiration"]281 elif get_creds_api_version == IamHelper.GetClusterCredentialsAPIType.IAM_V2:282 cred = typing.cast(283 typing.Dict[str, typing.Union[str, datetime.datetime]],284 client.get_cluster_credentials_with_iam(285 DbName=info.db_name,286 ClusterIdentifier=info.cluster_identifier,287 DurationSeconds=info.duration,288 ),289 )290 else:291 cred = typing.cast(292 typing.Dict[str, typing.Union[str, datetime.datetime]],293 client.get_cluster_credentials(294 DbUser=info.db_user,295 DbName=info.db_name,296 DbGroups=info.db_groups,297 ClusterIdentifier=info.cluster_identifier,298 AutoCreate=info.auto_create,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful