How to use get_cluster_credentials method in localstack

Best Python code snippet using localstack_python

test_iam_helper.py

Source:test_iam_helper.py Github

copy

Full Screen

...348 assert len(IamHelper.credentials_cache) == 1349 assert mock_boto_client.called is True350 mock_boto_client.assert_has_calls(351 [352 call().get_cluster_credentials(353 AutoCreate=rp.auto_create,354 ClusterIdentifier=rp.cluster_identifier,355 DbGroups=rp.db_groups,356 DbName=rp.db_name,357 DbUser=rp.db_user,358 )359 ]360 )361@mock.patch("boto3.client.get_cluster_credentials")362@mock.patch("boto3.client.describe_clusters")363@mock.patch("boto3.client")364def test_set_cluster_credentials_honors_iam_disable_cache(365 mock_boto_client, mock_describe_clusters, mock_get_cluster_credentials366):367 mock_cred_provider = MagicMock()368 mock_cred_holder = MagicMock()369 mock_cred_provider.get_credentials.return_value = mock_cred_holder370 mock_cred_holder.has_associated_session = False371 rp: RedshiftProperty = make_redshift_property()372 rp.iam_disable_cache = True373 IamHelper.credentials_cache.clear()374 IamHelper.set_cluster_credentials(mock_cred_provider, rp)375 assert len(IamHelper.credentials_cache) == 0376 assert mock_boto_client.called is True377 mock_boto_client.assert_has_calls(378 [379 call().get_cluster_credentials(380 AutoCreate=rp.auto_create,381 ClusterIdentifier=rp.cluster_identifier,382 DbGroups=rp.db_groups,383 DbName=rp.db_name,384 DbUser=rp.db_user,385 )386 ]387 )388@mock.patch("boto3.client.get_cluster_credentials")389@mock.patch("boto3.client.describe_clusters")390@mock.patch("boto3.client")391def test_set_cluster_credentials_ignores_cache_when_disabled(392 mock_boto_client, mock_describe_clusters, mock_get_cluster_credentials393):394 mock_cred_provider = MagicMock()395 mock_cred_holder = MagicMock()396 mock_cred_provider.get_credentials.return_value = mock_cred_holder397 mock_cred_holder.has_associated_session = False398 rp: RedshiftProperty = make_redshift_property()399 rp.iam_disable_cache = True400 # mock out the boto3 response temporary credentials stored from prior auth401 mock_cred_obj: typing.Dict[str, typing.Union[str, datetime.datetime]] = {402 "DbUser": "xyz",403 "DbPassword": "turtle",404 "Expiration": datetime.datetime(9999, 1, 1, tzinfo=tzutc()),405 }406 # populate the cache407 IamHelper.credentials_cache.clear()408 IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] = mock_cred_obj409 IamHelper.set_cluster_credentials(mock_cred_provider, rp)410 assert len(IamHelper.credentials_cache) == 1411 assert IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] is mock_cred_obj412 assert mock_boto_client.called is True413 # we should not have retrieved user/password from the cache414 assert rp.user_name != mock_cred_obj["DbUser"]415 assert rp.password != mock_cred_obj["DbPassword"]416 assert (417 call().get_cluster_credentials(418 AutoCreate=rp.auto_create,419 ClusterIdentifier=rp.cluster_identifier,420 DbGroups=rp.db_groups,421 DbName=rp.db_name,422 DbUser=rp.db_user,423 )424 in mock_boto_client.mock_calls425 )426@mock.patch("boto3.client.get_cluster_credentials")427@mock.patch("boto3.client.describe_clusters")428@mock.patch("boto3.client")429def test_set_cluster_credentials_uses_cache_if_possible(430 mock_boto_client, mock_describe_clusters, mock_get_cluster_credentials431):432 mock_cred_provider = MagicMock()433 mock_cred_holder = MagicMock()434 mock_cred_provider.get_credentials.return_value = mock_cred_holder435 mock_cred_holder.has_associated_session = False436 rp: RedshiftProperty = make_redshift_property()437 # mock out the boto3 response temporary credentials stored from prior auth438 mock_cred_obj: typing.Dict[str, typing.Union[str, datetime.datetime]] = {439 "DbUser": "xyz",440 "DbPassword": "turtle",441 "Expiration": datetime.datetime(9999, 1, 1, tzinfo=tzutc()),442 }443 # populate the cache444 IamHelper.credentials_cache.clear()445 IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] = mock_cred_obj446 IamHelper.set_cluster_credentials(mock_cred_provider, rp)447 assert len(IamHelper.credentials_cache) == 1448 assert IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] is mock_cred_obj449 assert mock_boto_client.called is True450 assert rp.user_name == mock_cred_obj["DbUser"]451 assert rp.password == mock_cred_obj["DbPassword"]452 assert (453 call().get_cluster_credentials(454 AutoCreate=rp.auto_create,455 ClusterIdentifier=rp.cluster_identifier,456 DbGroups=rp.db_groups,457 DbName=rp.db_name,458 DbUser=rp.db_user,459 )460 not in mock_boto_client.mock_calls461 )462@mock.patch("boto3.client.get_cluster_credentials")463@mock.patch("boto3.client.describe_clusters")464@mock.patch("boto3.client")465def test_set_cluster_credentials_refreshes_stale_credentials(466 mock_boto_client, mock_describe_clusters, mock_get_cluster_credentials467):468 mock_cred_provider = MagicMock()469 mock_cred_holder = MagicMock()470 mock_cred_provider.get_credentials.return_value = mock_cred_holder471 mock_cred_holder.has_associated_session = False472 rp: RedshiftProperty = make_redshift_property()473 # mock out the boto3 response temporary credentials stored from prior auth (now stale)474 mock_cred_obj: typing.Dict[str, typing.Union[str, datetime.datetime]] = {475 "DbUser": "xyz",476 "DbPassword": "turtle",477 "Expiration": datetime.datetime(1, 1, 1, tzinfo=tzutc()),478 }479 # populate the cache480 IamHelper.credentials_cache.clear()481 IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] = mock_cred_obj482 IamHelper.set_cluster_credentials(mock_cred_provider, rp)483 assert len(IamHelper.credentials_cache) == 1484 # ensure new temporary credentials have been replaced in cache485 assert IamHelper.get_credentials_cache_key(rp) in IamHelper.credentials_cache486 assert IamHelper.credentials_cache[IamHelper.get_credentials_cache_key(rp)] is not mock_cred_obj487 assert mock_boto_client.called is True488 mock_boto_client.assert_has_calls(489 [490 call().get_cluster_credentials(491 AutoCreate=rp.auto_create,492 ClusterIdentifier=rp.cluster_identifier,493 DbGroups=rp.db_groups,494 DbName=rp.db_name,495 DbUser=rp.db_user,496 )497 ]498 )499@pytest.mark.parametrize(500 "boto3_version",501 (502 "1.17.110",503 "1.17.100",504 ),...

Full Screen

Full Screen

sqlalchemy_database.py

Source:sqlalchemy_database.py Github

copy

Full Screen

...94 return None95 @staticmethod96 def _now():97 return datetime.now(timezone.utc)98 def get_cluster_credentials(self) -> Tuple[str, str]:99 if self._rs_client is None:100 import boto3101 self._rs_client = boto3.client(102 'redshift',103 region_name=self.rs_region_name,104 aws_access_key_id=self.aws_access_key_id,105 aws_secret_access_key=self.aws_secret_access_key,106 aws_session_token=None,107 )108 new_credentials = self._rs_client.get_cluster_credentials(109 DbUser=self.rs_db_user_id,110 DbName=self.database_name,111 DurationSeconds=self.rs_duration_seconds,112 ClusterIdentifier=self.rs_cluster_id113 )114 rs_new_credentials_timedelta = timedelta(seconds=self.rs_new_credentials_seconds)115 self._rs_credential_expiry = self._now() + rs_new_credentials_timedelta116 if 'DbUser' not in new_credentials or 'DbPassword' not in new_credentials:117 raise SQLAlchemyError(f'Invalid response from get_cluster_credentials got {new_credentials}')118 if 'Expiration' in new_credentials:119 # Expire when redshift said or sooner as specified by rs_new_credentials_seconds120 server_expiry = new_credentials['Expiration']121 if server_expiry.tzinfo is None:122 # Moto seems to return naive datetime in local tz.123 # Boto3 returns tz aware datetime124 server_expiry = server_expiry.astimezone(tz=None)125 self._rs_credential_expiry = min(self._rs_credential_expiry, server_expiry)126 return new_credentials['DbUser'], new_credentials['DbPassword']127 def _get_connector(self) -> str:128 if self.driver is None:129 return self.dialect130 else:131 return f"{self.dialect}+{self.driver}"132 def get_uri(self) -> URL:133 user_id = self.user_id134 if self.use_get_cluster_credentials:135 if self.aws_access_key_id is None:136 self.aws_access_key_id = self.user_id137 if self.aws_secret_access_key is None:138 self.aws_secret_access_key = self.get_password()139 if self.rs_db_user_id is None:140 raise ValueError(141 f'rs_db_user_id required for {self.__class__.__name__} {self.host} '142 f'with use_get_cluster_credentials = True.'143 )144 user_id, password = self.get_cluster_credentials()145 else:146 # Will be set below147 password = None148 if self.dialect in {'sqlite'}:149 return URL.create(150 drivername=self._get_connector(),151 database=self.database_name,152 )153 else:154 if not self.use_get_cluster_credentials:155 password = self.get_password()156 try:157 return URL.create(158 drivername=self._get_connector(),159 host=self.host,160 port=self.port,161 username=user_id,162 password=password,163 database=self.database_name,164 )165 except AttributeError:166 return URL(167 drivername=self._get_connector(),168 host=self.host,169 port=self.port,170 username=user_id,171 password=password,172 database=self.database_name,173 )174 def get_engine(self) -> Engine:175 if self._engine is None:176 kwargs = self.create_engine_args or {}177 if self.dialect == 'oracle':178 if 'arraysize' not in kwargs:179 kwargs['arraysize'] = self.arraysize180 if self.encoding:181 kwargs['encoding'] = self.encoding182 if self.poolclass:183 if self.poolclass == 'QueuePool':184 kwargs['poolclass'] = QueuePool185 elif self.poolclass == 'NullPool':186 kwargs['poolclass'] = NullPool187 else:188 raise ValueError(f'Unexpected poolclass {self.poolclass}')189 self._engine = create_engine(self.get_uri(), **kwargs)190 # Make an event listener so we can get new RS credentials if needed191 @event.listens_for(self._engine, 'do_connect', named=True)192 def engine_do_connect(**kw):193 """194 listen for the 'do_connect' event195 """196 # from bi_etl.utility import dict_to_str197 # print(dict_to_str(kw))198 if self.use_get_cluster_credentials:199 if self._now() >= self._rs_credential_expiry:200 logging.info('Getting new Redshift cluster credentials')201 db_user, password = self.get_cluster_credentials()202 kw['cparams']['user'] = db_user203 kw['cparams']['password'] = password204 # Return None to allow control to pass to the next event handler and ultimately205 # to allow the dialect to connect normally, given the updated arguments.206 return None207 # End engine_do_connect sub function208 return self._engine209 def raw_connection(self):210 return self.get_engine().raw_connection()211 def connect(self):212 if self.dialect == 'sqlite':213 if self._sqlite_connection is None or self._sqlite_connection.closed:214 self._sqlite_connection = self.get_engine().connect()215 return self._sqlite_connection...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful