How to use _create_message_attributes method in localstack

Best Python code snippet using localstack_python

cloud_console_client.py

Source:cloud_console_client.py Github

copy

Full Screen

...111 message['data'] = data112 if msg_attributes:113 message['attributes'] = msg_attributes114 return message115 def _create_message_attributes(self, message_type_enum):116 """Creates a cloud pubsub notification message attribute map.117 Fills in the version, moblab mac address, and moblab id information118 as attributes.119 @param message_type_enum The message type enum.120 @returns: A pubsub messsage attribute map.121 """122 msg_attributes = {}123 msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (124 _get_message_type_name(message_type_enum))125 msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (126 CURRENT_MESSAGE_VERSION)127 msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (128 utils.get_moblab_serial_number())129 msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (130 utils.get_moblab_id())131 return msg_attributes132 def _create_test_job_offloaded_message(self, gcs_uri):133 """Construct a test result notification.134 TODO(ntang): switch LEGACY to new message format.135 @param gcs_uri: The test result Google Cloud Storage URI.136 @returns The notification message.137 """138 data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)139 msg_attributes = {}140 msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION141 msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (142 utils.get_moblab_serial_number())143 msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()144 msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri145 return self._create_message(data, msg_attributes)146 def send_test_job_offloaded_message(self, gcs_uri):147 """Notify the cloud console a test job is offloaded.148 @param gcs_uri: The test result Google Cloud Storage URI.149 @returns True if the notification is successfully sent.150 Otherwise, False.151 """152 logging.info('Notification on gcs_uri %s', gcs_uri)153 message = self._create_test_job_offloaded_message(gcs_uri)154 return self._publish_notification(message)155 def _publish_notification(self, message):156 msg_ids = self._pubsub_client.publish_notifications(157 self._pubsub_topic, [message])158 if msg_ids:159 logging.debug('Successfully sent out a notification')160 return True161 logging.warning('Failed to send notification %s', str(message))162 return False163 def send_heartbeat(self):164 """Sends a heartbeat.165 @returns True if the heartbeat notification is successfully sent.166 Otherwise, False.167 """168 logging.info('Sending a heartbeat')169 event = cpcon.Heartbeat()170 # Don't sent local timestamp for now.171 data = event.SerializeToString()172 try:173 attributes = self._create_message_attributes(174 cpcon.MSG_MOBLAB_HEARTBEAT)175 message = self._create_message(data, attributes)176 except ValueError:177 logging.exception('Failed to create message.')178 return False179 return self._publish_notification(message)180 def send_event(self, event_type=None, event_data=None):181 """Sends an event notification to the remote console.182 @param event_type: The event type that is defined in the protobuffer183 file 'cloud_console.proto'.184 @param event_data: The event data.185 @returns True if the notification is successfully sent.186 Otherwise, False.187 """188 logging.info('Send an event.')189 if not event_type:190 logging.info('Failed to send event without a type.')191 return False192 event = cpcon.RemoteEventMessage()193 if event_data:194 event.data = event_data195 else:196 event.data = ''197 event.type = event_type198 data = event.SerializeToString()199 try:200 attributes = self._create_message_attributes(201 cpcon.MSG_MOBLAB_REMOTE_EVENT)202 message = self._create_message(data, attributes)203 except ValueError:204 logging.exception('Failed to create message.')205 return False...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful