How to use save_for_retrospection method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...46# Endpoint to access all the sent emails47# (relative to LocalStack internal HTTP resources base endpoint)48EMAILS_ENDPOINT = "/ses"49_EMAILS_ENDPOINT_REGISTERED = False50def save_for_retrospection(id: str, region: str, **kwargs: Dict[str, Any]):51 """Save a message for retrospection.52 The email is saved to filesystem and is also made accessible via a service endpoint.53 kwargs should consist of following keys related to the email:54 - Body55 - Destinations56 - HtmlBody57 - RawData58 - Source59 - Subject60 - Template61 - TemplateData62 """63 ses_dir = os.path.join(config.dirs.data or config.dirs.tmp, "ses")64 mkdir(ses_dir)65 path = os.path.join(ses_dir, id + ".json")66 email = {"Id": id, "Region": region, **kwargs}67 EMAILS[id] = email68 def _serialize(obj):69 """JSON serializer for timestamps."""70 if isinstance(obj, (datetime, date, time)):71 return obj.isoformat()72 return obj.__dict__73 with open(path, "w") as f:74 f.write(json.dumps(email, default=_serialize))75 LOGGER.debug("Email saved at: %s", path)76class SesServiceApiResource:77 """Provides a REST API for retrospective access to emails sent via SES.78 This is registered as a LocalStack internal HTTP resource."""79 def on_get(self, request):80 return {81 "messages": list(EMAILS.values()),82 }83def register_ses_api_resource():84 """Register the email retrospection endpoint as an internal LocalStack endpoint."""85 # Use a global to indicate whether the resource has already been registered86 # This is cheaper than iterating over the registered routes in the Router object87 global _EMAILS_ENDPOINT_REGISTERED88 if not _EMAILS_ENDPOINT_REGISTERED:89 get_internal_apis().add(EMAILS_ENDPOINT, SesServiceApiResource())90 _EMAILS_ENDPOINT_REGISTERED = True91class SesProvider(SesApi, ServiceLifecycleHook):92 #93 # Lifecycle Hooks94 #95 def on_after_init(self):96 # Allow sent emails to be retrieved from the SES emails endpoint97 register_ses_api_resource()98 #99 # Helpers100 #101 def get_source_from_raw(self, raw_data: str) -> Optional[str]:102 """Given a raw representation of email, return the source/from field."""103 entities = raw_data.split("\n")104 for entity in entities:105 if "From:" in entity:106 return entity.replace("From:", "").strip()107 return None108 #109 # Implementations for SES operations110 #111 @handler("ListTemplates")112 def list_templates(113 self, context: RequestContext, next_token: NextToken = None, max_items: MaxItems = None114 ) -> ListTemplatesResponse:115 for template in ses_backend.list_templates():116 if isinstance(template["Timestamp"], (date, datetime)):117 template["Timestamp"] = timestamp_millis(template["Timestamp"])118 return call_moto(context)119 @handler("DeleteTemplate")120 def delete_template(121 self, context: RequestContext, template_name: TemplateName122 ) -> DeleteTemplateResponse:123 if template_name in ses_backend.templates:124 del ses_backend.templates[template_name]125 return DeleteTemplateResponse()126 @handler("GetIdentityVerificationAttributes")127 def get_identity_verification_attributes(128 self, context: RequestContext, identities: IdentityList129 ) -> GetIdentityVerificationAttributesResponse:130 attributes: VerificationAttributes = {}131 for identity in identities:132 if "@" in identity:133 attributes[identity] = IdentityVerificationAttributes(134 VerificationStatus=VerificationStatus.Success,135 )136 else:137 attributes[identity] = IdentityVerificationAttributes(138 VerificationStatus=VerificationStatus.Success,139 VerificationToken=long_uid(),140 )141 return GetIdentityVerificationAttributesResponse(142 VerificationAttributes=attributes,143 )144 @handler("SendEmail")145 def send_email(146 self,147 context: RequestContext,148 source: Address,149 destination: Destination,150 message: Message,151 reply_to_addresses: AddressList = None,152 return_path: Address = None,153 source_arn: AmazonResourceName = None,154 return_path_arn: AmazonResourceName = None,155 tags: MessageTagList = None,156 configuration_set_name: ConfigurationSetName = None,157 ) -> SendEmailResponse:158 response = call_moto(context)159 save_for_retrospection(160 response["MessageId"],161 context.region,162 Source=source,163 Destination=destination,164 Subject=message["Subject"].get("Data"),165 Body=message["Body"].get("Text", {}).get("Data"),166 HtmlBody=message["Body"].get("Html", {}).get("Data"),167 )168 return response169 @handler("SendTemplatedEmail")170 def send_templated_email(171 self,172 context: RequestContext,173 source: Address,174 destination: Destination,175 template: TemplateName,176 template_data: TemplateData,177 reply_to_addresses: AddressList = None,178 return_path: Address = None,179 source_arn: AmazonResourceName = None,180 return_path_arn: AmazonResourceName = None,181 tags: MessageTagList = None,182 configuration_set_name: ConfigurationSetName = None,183 template_arn: AmazonResourceName = None,184 ) -> SendTemplatedEmailResponse:185 response = call_moto(context)186 save_for_retrospection(187 response["MessageId"],188 context.region,189 Source=source,190 Template=template,191 TemplateData=template_data,192 Destination=destination,193 )194 return response195 @handler("SendRawEmail")196 def send_raw_email(197 self,198 context: RequestContext,199 raw_message: RawMessage,200 source: Address = None,201 destinations: AddressList = None,202 from_arn: AmazonResourceName = None,203 source_arn: AmazonResourceName = None,204 return_path_arn: AmazonResourceName = None,205 tags: MessageTagList = None,206 configuration_set_name: ConfigurationSetName = None,207 ) -> SendRawEmailResponse:208 raw_data = to_str(raw_message["Data"])209 if source is None or not source.strip():210 LOGGER.debug("Raw email:\n%s\nEOT", raw_data)211 source = self.get_source_from_raw(raw_data)212 if not source:213 LOGGER.warning("Source not specified. Rejecting message.")214 raise MessageRejected()215 message = ses_backend.send_raw_email(source, destinations, raw_data, context.region)216 save_for_retrospection(217 message.id, context.region, Source=source, Destination=destinations, RawData=raw_data218 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful