Best Python code snippet using localstack_python
cloud_console_client.py
Source:cloud_console_client.py  
...111            message['data'] = data112        if msg_attributes:113            message['attributes'] = msg_attributes114        return message115    def _create_message_attributes(self, message_type_enum):116        """Creates a cloud pubsub notification message attribute map.117        Fills in the version, moblab mac address, and moblab id information118        as attributes.119        @param message_type_enum The message type enum.120        @returns: A pubsub messsage attribute map.121        """122        msg_attributes = {}123        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (124                _get_message_type_name(message_type_enum))125        msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (126                CURRENT_MESSAGE_VERSION)127        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (128                utils.get_moblab_serial_number())129        msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (130                utils.get_moblab_id())131        return msg_attributes132    def _create_test_job_offloaded_message(self, gcs_uri):133        """Construct a test result notification.134        TODO(ntang): switch LEGACY to new message format.135        @param gcs_uri: The test result Google Cloud Storage URI.136        @returns The notification message.137        """138        data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)139        msg_attributes = {}140        msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION141        msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (142                utils.get_moblab_serial_number())143        msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()144        msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri145        return self._create_message(data, msg_attributes)146    def send_test_job_offloaded_message(self, gcs_uri):147        """Notify the cloud console a test job is offloaded.148        @param gcs_uri: The test result Google Cloud Storage URI.149        @returns True if the notification is successfully sent.150            Otherwise, False.151        """152        logging.info('Notification on gcs_uri %s', gcs_uri)153        message = self._create_test_job_offloaded_message(gcs_uri)154        return self._publish_notification(message)155    def _publish_notification(self, message):156        msg_ids = self._pubsub_client.publish_notifications(157                self._pubsub_topic, [message])158        if msg_ids:159            logging.debug('Successfully sent out a notification')160            return True161        logging.warning('Failed to send notification %s', str(message))162        return False163    def send_heartbeat(self):164        """Sends a heartbeat.165        @returns True if the heartbeat notification is successfully sent.166            Otherwise, False.167        """168        logging.info('Sending a heartbeat')169        event = cpcon.Heartbeat()170        # Don't sent local timestamp for now.171        data = event.SerializeToString()172        try:173            attributes = self._create_message_attributes(174                    cpcon.MSG_MOBLAB_HEARTBEAT)175            message = self._create_message(data, attributes)176        except ValueError:177            logging.exception('Failed to create message.')178            return False179        return self._publish_notification(message)180    def send_event(self, event_type=None, event_data=None):181        """Sends an event notification to the remote console.182        @param event_type: The event type that is defined in the protobuffer183            file 'cloud_console.proto'.184        @param event_data: The event data.185        @returns True if the notification is successfully sent.186            Otherwise, False.187        """188        logging.info('Send an event.')189        if not event_type:190            logging.info('Failed to send event without a type.')191            return False192        event = cpcon.RemoteEventMessage()193        if event_data:194            event.data = event_data195        else:196            event.data = ''197        event.type = event_type198        data = event.SerializeToString()199        try:200            attributes = self._create_message_attributes(201                    cpcon.MSG_MOBLAB_REMOTE_EVENT)202            message = self._create_message(data, attributes)203        except ValueError:204            logging.exception('Failed to create message.')205            return False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
