How to use describe_endpoint method in localstack

Best Python code snippet using localstack_python

test_discovery.py

Source:test_discovery.py Github

copy

Full Screen

...179 def test_gather_identifiers_none(self):180 operation = self.service_model.operation_model('TestDiscovery')181 ids = self.manager.gather_identifiers(operation, {})182 self.assertEqual(ids, {})183 def test_describe_endpoint(self):184 kwargs = {185 'Operation': 'FooBar',186 'Identifiers': {'Foo': 'value1', 'Bar': 'value2'},187 }188 self.manager.describe_endpoint(**kwargs)189 self.client.describe_endpoints.assert_called_with(**kwargs)190 def test_describe_endpoint_no_input(self):191 describe = self.service_description['operations']['DescribeEndpoints']192 del describe['input']193 self.construct_manager()194 self.manager.describe_endpoint(Operation='FooBar', Identifiers={})195 self.client.describe_endpoints.assert_called_with()196 def test_describe_endpoint_empty_input(self):197 describe = self.service_description['operations']['DescribeEndpoints']198 describe['input'] = {'shape': 'EmptyStruct'}199 self.construct_manager()200 self.manager.describe_endpoint(Operation='FooBar', Identifiers={})201 self.client.describe_endpoints.assert_called_with()202 def test_describe_endpoint_ids_and_operation(self):203 cache = {}204 self.construct_manager(cache=cache)205 ids = {'Foo': 'value1', 'Bar': 'value2'}206 kwargs = {207 'Operation': 'TestDiscoveryRequired',208 'Identifiers': ids,209 }210 self.manager.describe_endpoint(**kwargs)211 self.client.describe_endpoints.assert_called_with(**kwargs)212 key = ((('Bar', 'value2'), ('Foo', 'value1')), 'TestDiscoveryRequired')213 self.assertIn(key, cache)214 self.assertEqual(cache[key][0]['Address'], 'new.com')215 self.manager.describe_endpoint(**kwargs)216 call_count = self.client.describe_endpoints.call_count217 self.assertEqual(call_count, 1)218 def test_describe_endpoint_no_ids_or_operation(self):219 cache = {}220 describe = self.service_description['operations']['DescribeEndpoints']221 describe['input'] = {'shape': 'EmptyStruct'}222 self.construct_manager(cache=cache)223 self.manager.describe_endpoint(224 Operation='TestDiscoveryRequired', Identifiers={}225 )226 self.client.describe_endpoints.assert_called_with()227 key = ()228 self.assertIn(key, cache)229 self.assertEqual(cache[key][0]['Address'], 'new.com')230 self.manager.describe_endpoint(231 Operation='TestDiscoveryRequired', Identifiers={}232 )233 call_count = self.client.describe_endpoints.call_count234 self.assertEqual(call_count, 1)235 def test_describe_endpoint_expired_entry(self):236 current_time = time.time()237 key = ()238 cache = {239 key: [{'Address': 'old.com', 'Expiration': current_time - 10}]240 }241 self.construct_manager(cache=cache)242 kwargs = {243 'Identifiers': {},244 'Operation': 'TestDiscoveryRequired',245 }246 self.manager.describe_endpoint(**kwargs)247 self.client.describe_endpoints.assert_called_with()248 self.assertIn(key, cache)249 self.assertEqual(cache[key][0]['Address'], 'new.com')250 self.manager.describe_endpoint(**kwargs)251 call_count = self.client.describe_endpoints.call_count252 self.assertEqual(call_count, 1)253 def test_describe_endpoint_cache_expiration(self):254 def _time():255 return float(0)256 cache = {}257 self.construct_manager(cache=cache, time=_time)258 self.manager.describe_endpoint(259 Operation='TestDiscoveryRequired', Identifiers={}260 )261 key = ()262 self.assertIn(key, cache)263 self.assertEqual(cache[key][0]['Expiration'], float(120))264 def test_delete_endpoints_present(self):265 key = ()266 cache = {267 key: [{'Address': 'old.com', 'Expiration': 0}]268 }269 self.construct_manager(cache=cache)270 kwargs = {271 'Identifiers': {},272 'Operation': 'TestDiscoveryRequired',273 }274 self.manager.delete_endpoints(**kwargs)275 self.assertEqual(cache, {})276 def test_delete_endpoints_absent(self):277 cache = {}278 self.construct_manager(cache=cache)279 kwargs = {280 'Identifiers': {},281 'Operation': 'TestDiscoveryRequired',282 }283 self.manager.delete_endpoints(**kwargs)284 self.assertEqual(cache, {})285 def test_describe_endpoint_optional_fails_no_cache(self):286 side_effect = [ConnectionError(error=None)]287 self.construct_manager(side_effect=side_effect)288 kwargs = {'Operation': 'TestDiscoveryOptional'}289 endpoint = self.manager.describe_endpoint(**kwargs)290 self.assertIsNone(endpoint)291 # This second call should be blocked as we just failed292 endpoint = self.manager.describe_endpoint(**kwargs)293 self.assertIsNone(endpoint)294 self.client.describe_endpoints.call_args_list == [call()]295 def test_describe_endpoint_optional_fails_stale_cache(self):296 key = ()297 cache = {298 key: [{'Address': 'old.com', 'Expiration': 0}]299 }300 side_effect = [ConnectionError(error=None)] * 2301 self.construct_manager(cache=cache, side_effect=side_effect)302 kwargs = {'Operation': 'TestDiscoveryOptional'}303 endpoint = self.manager.describe_endpoint(**kwargs)304 self.assertEqual(endpoint, 'old.com')305 # This second call shouldn't go through as we just failed306 endpoint = self.manager.describe_endpoint(**kwargs)307 self.assertEqual(endpoint, 'old.com')308 self.client.describe_endpoints.call_args_list == [call()]309 def test_describe_endpoint_required_fails_no_cache(self):310 side_effect = [ConnectionError(error=None)] * 2311 self.construct_manager(side_effect=side_effect)312 kwargs = {'Operation': 'TestDiscoveryRequired'}313 with self.assertRaises(EndpointDiscoveryRefreshFailed):314 self.manager.describe_endpoint(**kwargs)315 # This second call should go through, as we have no cache316 with self.assertRaises(EndpointDiscoveryRefreshFailed):317 self.manager.describe_endpoint(**kwargs)318 describe_count = self.client.describe_endpoints.call_count319 self.assertEqual(describe_count, 2)320 def test_describe_endpoint_required_fails_stale_cache(self):321 key = ()322 cache = {323 key: [{'Address': 'old.com', 'Expiration': 0}]324 }325 side_effect = [ConnectionError(error=None)] * 2326 self.construct_manager(cache=cache, side_effect=side_effect)327 kwargs = {'Operation': 'TestDiscoveryRequired'}328 endpoint = self.manager.describe_endpoint(**kwargs)329 self.assertEqual(endpoint, 'old.com')330 # We have a stale endpoint, so this shouldn't fail or force a refresh331 endpoint = self.manager.describe_endpoint(**kwargs)332 self.assertEqual(endpoint, 'old.com')333 self.client.describe_endpoints.call_args_list == [call()]334 def test_describe_endpoint_required_force_refresh_success(self):335 side_effect = [336 ConnectionError(error=None),337 {'Endpoints': [{338 'Address': 'new.com',339 'CachePeriodInMinutes': 2,340 }]},341 ]342 self.construct_manager(side_effect=side_effect)343 kwargs = {'Operation': 'TestDiscoveryRequired'}344 # First call will fail345 with self.assertRaises(EndpointDiscoveryRefreshFailed):346 self.manager.describe_endpoint(**kwargs)347 self.client.describe_endpoints.call_args_list == [call()]348 # Force a refresh if the cache is empty but discovery is required349 endpoint = self.manager.describe_endpoint(**kwargs)350 self.assertEqual(endpoint, 'new.com')351 def test_describe_endpoint_retries_after_failing(self):352 fake_time = Mock()353 fake_time.side_effect = [0, 100, 200]354 side_effect = [355 ConnectionError(error=None),356 {'Endpoints': [{357 'Address': 'new.com',358 'CachePeriodInMinutes': 2,359 }]},360 ]361 self.construct_manager(side_effect=side_effect, time=fake_time)362 kwargs = {'Operation': 'TestDiscoveryOptional'}363 endpoint = self.manager.describe_endpoint(**kwargs)364 self.assertIsNone(endpoint)365 self.client.describe_endpoints.call_args_list == [call()]366 # Second time should try again as enough time has elapsed367 endpoint = self.manager.describe_endpoint(**kwargs)368 self.assertEqual(endpoint, 'new.com')369class TestEndpointDiscoveryHandler(BaseEndpointDiscoveryTest):370 def setUp(self):371 super(TestEndpointDiscoveryHandler, self).setUp()372 self.manager = Mock(spec=EndpointDiscoveryManager)373 self.handler = EndpointDiscoveryHandler(self.manager)374 self.service_model = ServiceModel(self.service_description)375 def test_register_handler(self):376 events = Mock(spec=HierarchicalEmitter)377 self.handler.register(events, 'foo-bar')378 events.register.assert_any_call(379 'before-parameter-build.foo-bar', self.handler.gather_identifiers380 )381 events.register.assert_any_call(...

Full Screen

Full Screen

deploy_env.py

Source:deploy_env.py Github

copy

Full Screen

...15 return self.data["environments"][self.current_env()][name]16 def isDeployed(self):17 """18 Checks if the model is deployed.19 IMPORTANT: always returns `False` for local endpoints as LocalSagemakerClient.describe_endpoint()20 seems to always throw:21 botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the describe_endpoint operation: Could not find local endpoint22 """23 _isDeployed = False24 try:25 self.client().describe_endpoint(EndpointName = self.setting("model_name"))26 _isDeployed = True27 except (botocore.exceptions.ClientError) as e:28 pass29 return _isDeployed30 def runtime_client(self):31 if self._runtime_client:32 return self._runtime_client33 if self.isLocal():34 self._runtime_client = sagemaker.local.LocalSagemakerRuntimeClient()35 else:36 self._runtime_client = boto3.client('sagemaker-runtime')37 return self._runtime_client38 def client(self):39 if self._client:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful