Best Python code snippet using localstack_python
cbc_proxy.py
Source:cbc_proxy.py  
...85        sent, expires, message_number=None86    ):87        pass88    def _invoke_lambda_with_failover(self, payload):89        result = self._invoke_lambda(self.lambda_name, payload)90        if not result:91            failover_result = self._invoke_lambda(self.failover_lambda_name, payload)92            if not failover_result:93                raise CBCProxyRetryableException(94                    f'Lambda failed for both {self.lambda_name} and {self.failover_lambda_name}'95                )96        return result97    def _invoke_lambda(self, lambda_name, payload):98        payload_bytes = bytes(json.dumps(payload), encoding='utf8')99        try:100            current_app.logger.info(101                f"Calling lambda {lambda_name} with payload {str(payload)[:1000]}"102            )103            result = self._lambda_client.invoke(104                FunctionName=lambda_name,105                InvocationType='RequestResponse',106                Payload=payload_bytes,107            )108        except botocore.exceptions.ClientError:109            current_app.logger.exception(f'Boto ClientError calling lambda {lambda_name}')110            success = False111            return success112        if result['StatusCode'] > 299:113            current_app.logger.info(114                f"Error calling lambda {lambda_name} with status code { result['StatusCode']}, {result.get('Payload')}"115            )116            success = False117        elif 'FunctionError' in result:118            current_app.logger.info(119                f"Error calling lambda {lambda_name} with function error { result['Payload'].read() }"120            )121            success = False122        else:123            success = True124        return success125    def infer_language_from(self, content):126        if non_gsm_characters(content):127            return self.LANGUAGE_WELSH128        return self.LANGUAGE_ENGLISH129class CBCProxyOne2ManyClient(CBCProxyClientBase):130    LANGUAGE_ENGLISH = 'en-GB'131    LANGUAGE_WELSH = 'cy-GB'132    def _send_link_test(133        self,134        lambda_name,135    ):136        """137        link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of138        test.139        """140        payload = {141            'message_type': 'test',142            'identifier': str(uuid.uuid4()),143            'message_format': 'cap'144        }145        self._invoke_lambda(lambda_name=lambda_name, payload=payload)146    def create_and_send_broadcast(147        self, identifier, headline, description, areas, sent, expires, channel, message_number=None148    ):149        payload = {150            'message_type': 'alert',151            'identifier': identifier,152            'message_format': 'cap',153            'headline': headline,154            'description': description,155            'areas': areas,156            'sent': sent,157            'expires': expires,158            'language': self.infer_language_from(description),159            'channel': channel,160        }161        self._invoke_lambda_with_failover(payload=payload)162    def cancel_broadcast(163        self,164        identifier, previous_provider_messages,165        sent, message_number=None166    ):167        payload = {168            'message_type': 'cancel',169            'identifier': identifier,170            'message_format': 'cap',171            "references": [172                {173                    "message_id": str(message.id),174                    "sent": message.created_at.strftime(DATETIME_FORMAT)175                } for message in previous_provider_messages176            ],177            'sent': sent,178        }179        self._invoke_lambda_with_failover(payload=payload)180class CBCProxyEE(CBCProxyOne2ManyClient):181    lambda_name = 'ee-1-proxy'182    failover_lambda_name = 'ee-2-proxy'183class CBCProxyThree(CBCProxyOne2ManyClient):184    lambda_name = 'three-1-proxy'185    failover_lambda_name = 'three-2-proxy'186class CBCProxyO2(CBCProxyOne2ManyClient):187    lambda_name = 'o2-1-proxy'188    failover_lambda_name = 'o2-2-proxy'189class CBCProxyVodafone(CBCProxyClientBase):190    lambda_name = 'vodafone-1-proxy'191    failover_lambda_name = 'vodafone-2-proxy'192    LANGUAGE_ENGLISH = 'English'193    LANGUAGE_WELSH = 'Welsh'194    def _send_link_test(195        self,196        lambda_name,197    ):198        """199        link test - open up a connection to a specific provider, and send them an xml payload with a <msgType> of200        test.201        """202        from app import db203        sequence = Sequence('broadcast_provider_message_number_seq')204        sequential_number = db.session.connection().execute(sequence)205        formatted_seq_number = format_sequential_number(sequential_number)206        payload = {207            'message_type': 'test',208            'identifier': str(uuid.uuid4()),209            'message_number': formatted_seq_number,210            'message_format': 'ibag'211        }212        self._invoke_lambda(lambda_name=lambda_name, payload=payload)213    def create_and_send_broadcast(214        self, identifier, message_number, headline, description, areas, sent, expires, channel215    ):216        payload = {217            'message_type': 'alert',218            'identifier': identifier,219            'message_number': message_number,220            'message_format': 'ibag',221            'headline': headline,222            'description': description,223            'areas': areas,224            'sent': sent,225            'expires': expires,226            'language': self.infer_language_from(description),...boto_message_transport.py
Source:boto_message_transport.py  
...46            )47        except ClientError as e:48            raise ff.MessageBusError(str(e))49    def invoke(self, command: Command) -> Any:50        return self._invoke_lambda(command)51    def request(self, query: Query) -> Any:52        return self._invoke_lambda(query)53    def _invoke_lambda(self, message: Union[Command, Query]):54        try:55            response = self._lambda_client.invoke(56                FunctionName=f'{self._service_name(message.get_context())}Sync',57                InvocationType='RequestResponse',58                LogType='None',59                Payload=self._serializer.serialize(message)60            )61        except ClientError as e:62            raise ff.MessageBusError(str(e))63        return self._serializer.deserialize(response['Payload'].read().decode('utf-8'))64    def _store_large_payloads_in_s3(self, payload: str):65        if len(payload) > 64_000:66            key = f'tmp/{str(uuid.uuid1())}.json'67            self._s3_client.put_object(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
