How to use message_to_endpoint method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...150 platform_app, endpoint_attributes = get_attributes_for_application_endpoint(target_arn)151 message_structure = req_data.get("MessageStructure", [None])[0]152 LOG.debug("Publishing message to Endpoint: %s | Message: %s", target_arn, message)153 start_thread(154 lambda _: message_to_endpoint(155 target_arn,156 message,157 message_structure,158 endpoint_attributes,159 platform_app,160 )161 )162 return message_id163 LOG.debug("Publishing message to TopicArn: %s | Message: %s", topic_arn, message)164 start_thread(165 lambda _: message_to_subscribers(166 message_id,167 message,168 topic_arn,169 # TODO: check170 req_data,171 headers,172 subscription_arn,173 skip_checks,174 message_attributes,175 )176 )177 return message_id178def get_attributes_for_application_endpoint(target_arn):179 sns_client = aws_stack.connect_to_service("sns")180 app_name = target_arn.split("/")[-2]181 endpoint_attributes = None182 try:183 endpoint_attributes = sns_client.get_endpoint_attributes(EndpointArn=target_arn)[184 "Attributes"185 ]186 except botocore.exceptions.ClientError:187 LOG.warning(f"Missing attributes for endpoint: {target_arn}")188 if not endpoint_attributes:189 raise CommonServiceException(190 message="No account found for the given parameters",191 code="InvalidClientTokenId",192 status_code=403,193 )194 platform_apps = sns_client.list_platform_applications()["PlatformApplications"]195 app = None196 try:197 app = [x for x in platform_apps if app_name in x["PlatformApplicationArn"]][0]198 except IndexError:199 LOG.warning(f"Missing application: {target_arn}")200 if not app:201 raise CommonServiceException(202 message="No account found for the given parameters",203 code="InvalidClientTokenId",204 status_code=403,205 )206 # Validate parameters207 if "app/GCM/" in app["PlatformApplicationArn"]:208 validate_gcm_parameters(app, endpoint_attributes)209 return app, endpoint_attributes210def message_to_endpoint(target_arn, message, structure, endpoint_attributes, platform_app):211 if structure == "json":212 message = json.loads(message)213 platform_name = target_arn.split("/")[-3]214 response = None215 if platform_name == "GCM":216 response = send_message_to_GCM(217 platform_app["Attributes"], endpoint_attributes, message["GCM"]218 )219 if response is None:220 LOG.warn("Platform not implemeted yet")221 elif response.status_code != 200:222 LOG.warn(223 f"Platform {platform_name} returned response {response.status_code} with content {response.content}"224 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful