Best Python code snippet using localstack_python
provider.py
Source:provider.py  
...72        return obj.__dict__73    with open(path, "w") as f:74        f.write(json.dumps(email, default=_serialize))75    LOGGER.debug("Email saved at: %s", path)76def get_ses_backend(context: RequestContext) -> SESBackend:77    return ses_backends[context.account_id]["global"]78class SesServiceApiResource:79    """Provides a REST API for retrospective access to emails sent via SES.80    This is registered as a LocalStack internal HTTP resource.81    This endpoint accepts:82    - GET param `email`: filter for `source` field in SES message83    """84    def on_get(self, request):85        filter_source = request.args.get("email")86        messages = []87        for msg in EMAILS.values():88            if filter_source in (msg["Source"], None, ""):89                messages.append(msg)90        return {91            "messages": messages,92        }93def register_ses_api_resource():94    """Register the email retrospection endpoint as an internal LocalStack endpoint."""95    # Use a global to indicate whether the resource has already been registered96    # This is cheaper than iterating over the registered routes in the Router object97    global _EMAILS_ENDPOINT_REGISTERED98    if not _EMAILS_ENDPOINT_REGISTERED:99        get_internal_apis().add(EMAILS_ENDPOINT, SesServiceApiResource())100        _EMAILS_ENDPOINT_REGISTERED = True101class SesProvider(SesApi, ServiceLifecycleHook):102    #103    # Lifecycle Hooks104    #105    def on_after_init(self):106        # Allow sent emails to be retrieved from the SES emails endpoint107        register_ses_api_resource()108    #109    # Helpers110    #111    def get_source_from_raw(self, raw_data: str) -> Optional[str]:112        """Given a raw representation of email, return the source/from field."""113        entities = raw_data.split("\n")114        for entity in entities:115            if "From:" in entity:116                return entity.replace("From:", "").strip()117        return None118    #119    # Implementations for SES operations120    #121    @handler("ListTemplates")122    def list_templates(123        self, context: RequestContext, next_token: NextToken = None, max_items: MaxItems = None124    ) -> ListTemplatesResponse:125        backend = get_ses_backend(context)126        for template in backend.list_templates():127            if isinstance(template["Timestamp"], (date, datetime)):128                template["Timestamp"] = timestamp_millis(template["Timestamp"])129        return call_moto(context)130    @handler("DeleteTemplate")131    def delete_template(132        self, context: RequestContext, template_name: TemplateName133    ) -> DeleteTemplateResponse:134        backend = get_ses_backend(context)135        if template_name in backend.templates:136            del backend.templates[template_name]137        return DeleteTemplateResponse()138    @handler("GetIdentityVerificationAttributes")139    def get_identity_verification_attributes(140        self, context: RequestContext, identities: IdentityList141    ) -> GetIdentityVerificationAttributesResponse:142        attributes: VerificationAttributes = {}143        for identity in identities:144            if "@" in identity:145                attributes[identity] = IdentityVerificationAttributes(146                    VerificationStatus=VerificationStatus.Success,147                )148            else:149                attributes[identity] = IdentityVerificationAttributes(150                    VerificationStatus=VerificationStatus.Success,151                    VerificationToken=long_uid(),152                )153        return GetIdentityVerificationAttributesResponse(154            VerificationAttributes=attributes,155        )156    @handler("SendEmail")157    def send_email(158        self,159        context: RequestContext,160        source: Address,161        destination: Destination,162        message: Message,163        reply_to_addresses: AddressList = None,164        return_path: Address = None,165        source_arn: AmazonResourceName = None,166        return_path_arn: AmazonResourceName = None,167        tags: MessageTagList = None,168        configuration_set_name: ConfigurationSetName = None,169    ) -> SendEmailResponse:170        response = call_moto(context)171        text_part = message["Body"].get("Text", {}).get("Data")172        html_part = message["Body"].get("Html", {}).get("Data")173        save_for_retrospection(174            response["MessageId"],175            context.region,176            Source=source,177            Destination=destination,178            Subject=message["Subject"].get("Data"),179            Body=dict(text_part=text_part, html_part=html_part),180        )181        return response182    @handler("SendTemplatedEmail")183    def send_templated_email(184        self,185        context: RequestContext,186        source: Address,187        destination: Destination,188        template: TemplateName,189        template_data: TemplateData,190        reply_to_addresses: AddressList = None,191        return_path: Address = None,192        source_arn: AmazonResourceName = None,193        return_path_arn: AmazonResourceName = None,194        tags: MessageTagList = None,195        configuration_set_name: ConfigurationSetName = None,196        template_arn: AmazonResourceName = None,197    ) -> SendTemplatedEmailResponse:198        response = call_moto(context)199        save_for_retrospection(200            response["MessageId"],201            context.region,202            Source=source,203            Template=template,204            TemplateData=template_data,205            Destination=destination,206        )207        return response208    @handler("SendRawEmail")209    def send_raw_email(210        self,211        context: RequestContext,212        raw_message: RawMessage,213        source: Address = None,214        destinations: AddressList = None,215        from_arn: AmazonResourceName = None,216        source_arn: AmazonResourceName = None,217        return_path_arn: AmazonResourceName = None,218        tags: MessageTagList = None,219        configuration_set_name: ConfigurationSetName = None,220    ) -> SendRawEmailResponse:221        raw_data = to_str(raw_message["Data"])222        if source is None or not source.strip():223            LOGGER.debug("Raw email:\n%s\nEOT", raw_data)224            source = self.get_source_from_raw(raw_data)225            if not source:226                LOGGER.warning("Source not specified. Rejecting message.")227                raise MessageRejected()228        if destinations is None:229            destinations = []230        backend = get_ses_backend(context)231        message = backend.send_raw_email(source, destinations, raw_data, context.region)232        save_for_retrospection(233            message.id,234            context.region,235            Source=source or message.source,236            Destination=destinations,237            RawData=raw_data,238        )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
