How to use from_arn method in localstack

Best Python code snippet using localstack_python

email.py

Source:email.py Github

copy

Full Screen

1"""Email based notification system"""2import re3import smtplib4import uuid5from datetime import datetime6from email.mime.multipart import MIMEMultipart7from email.mime.text import MIMEText8from cloud_inquisitor import get_local_aws_session9from cloud_inquisitor.config import dbconfig, ConfigOption10from cloud_inquisitor.constants import NS_EMAIL, RGX_EMAIL_VALIDATION_PATTERN11from cloud_inquisitor.database import db12from cloud_inquisitor.exceptions import EmailSendError13from cloud_inquisitor.plugins.notifiers import BaseNotifier14from cloud_inquisitor.schema import Email15from cloud_inquisitor.utils import send_notification, NotificationContact, deprecated16class EmailNotifier(BaseNotifier):17 name = 'Email Notifier'18 ns = NS_EMAIL19 notifier_type = 'email'20 validation = RGX_EMAIL_VALIDATION_PATTERN21 options = (22 ConfigOption('enabled', True, 'bool', 'Enable the Email notifier plugin'),23 ConfigOption('from_address', 'changeme@domain.tld', 'string', 'Sender address for emails'),24 ConfigOption('method', 'ses', 'string', 'EMail sending method, either ses or smtp'),25 ConfigOption(26 'from_arn', '', 'string',27 'If using cross-account SES, this is the "From ARN", otherwise leave blank'28 ),29 ConfigOption(30 'return_path_arn', '', 'string',31 'If using cross-account SES, this is the "Return Path ARN", otherwise leave blank'32 ),33 ConfigOption(34 'source_arn', '', 'string',35 'If using cross-account SES, this is the "Source ARN", otherwise leave blank'36 ),37 ConfigOption('ses_region', 'us-west-2', 'string', 'Which SES region to send emails from'),38 ConfigOption('smtp_server', 'localhost', 'string', 'Address of the SMTP server to use'),39 ConfigOption('smtp_port', 25, 'int', 'Port for the SMTP server'),40 ConfigOption(41 'smtp_username', '', 'string',42 'Username for SMTP authentication. Leave blank for no authentication'43 ),44 ConfigOption(45 'smtp_password', '', 'string',46 'Password for SMTP authentication. Leave blank for no authentication'47 ),48 ConfigOption('smtp_tls', False, 'bool', 'Use TLS for sending emails'),49 )50 def __init__(self):51 super().__init__()52 self.sender = self.dbconfig.get('from_address', NS_EMAIL)53 def notify(self, subsystem, recipient, subject, body_html, body_text):54 """Method to send a notification. A plugin may use only part of the information, but all fields are required.55 Args:56 subsystem (`str`): Name of the subsystem originating the notification57 recipient (`str`): Recipient email address58 subject (`str`): Subject / title of the notification59 body_html (`str)`: HTML formatted version of the message60 body_text (`str`): Text formatted version of the message61 Returns:62 `None`63 """64 if not re.match(RGX_EMAIL_VALIDATION_PATTERN, recipient, re.I):65 raise ValueError('Invalid recipient provided')66 email = Email()67 email.timestamp = datetime.now()68 email.subsystem = subsystem69 email.sender = self.sender70 email.recipients = recipient71 email.subject = subject72 email.uuid = uuid.uuid4()73 email.message_html = body_html74 email.message_text = body_text75 method = dbconfig.get('method', NS_EMAIL, 'ses')76 try:77 if method == 'ses':78 self.__send_ses_email([recipient], subject, body_html, body_text)79 elif method == 'smtp':80 self.__send_smtp_email([recipient], subject, body_html, body_text)81 else:82 raise ValueError('Invalid email method: {}'.format(method))83 db.session.add(email)84 db.session.commit()85 except Exception as ex:86 raise EmailSendError(ex)87 def __send_ses_email(self, recipients, subject, body_html, body_text):88 """Send an email using SES89 Args:90 recipients (`1ist` of `str`): List of recipient email addresses91 subject (str): Subject of the email92 body_html (str): HTML body of the email93 body_text (str): Text body of the email94 Returns:95 `None`96 """97 source_arn = dbconfig.get('source_arn', NS_EMAIL)98 return_arn = dbconfig.get('return_path_arn', NS_EMAIL)99 session = get_local_aws_session()100 ses = session.client('ses', region_name=dbconfig.get('ses_region', NS_EMAIL, 'us-west-2'))101 body = {}102 if body_html:103 body['Html'] = {104 'Data': body_html105 }106 if body_text:107 body['Text'] = {108 'Data': body_text109 }110 ses_options = {111 'Source': self.sender,112 'Destination': {113 'ToAddresses': recipients114 },115 'Message': {116 'Subject': {117 'Data': subject118 },119 'Body': body120 }121 }122 # Set SES options if needed123 if source_arn and return_arn:124 ses_options.update({125 'SourceArn': source_arn,126 'ReturnPathArn': return_arn127 })128 ses.send_email(**ses_options)129 def __send_smtp_email(self, recipients, subject, html_body, text_body):130 """Send an email using SMTP131 Args:132 recipients (`list` of `str`): List of recipient email addresses133 subject (str): Subject of the email134 html_body (str): HTML body of the email135 text_body (str): Text body of the email136 Returns:137 `None`138 """139 smtp = smtplib.SMTP(140 dbconfig.get('smtp_server', NS_EMAIL, 'localhost'),141 dbconfig.get('smtp_port', NS_EMAIL, 25)142 )143 source_arn = dbconfig.get('source_arn', NS_EMAIL)144 return_arn = dbconfig.get('return_path_arn', NS_EMAIL)145 from_arn = dbconfig.get('from_arn', NS_EMAIL)146 msg = MIMEMultipart('alternative')147 # Set SES options if needed148 if source_arn and from_arn and return_arn:149 msg['X-SES-SOURCE-ARN'] = source_arn150 msg['X-SES-FROM-ARN'] = from_arn151 msg['X-SES-RETURN-PATH-ARN'] = return_arn152 msg['Subject'] = subject153 msg['To'] = ','.join(recipients)154 msg['From'] = self.sender155 # Check body types to avoid exceptions156 if html_body:157 html_part = MIMEText(html_body, 'html')158 msg.attach(html_part)159 if text_body:160 text_part = MIMEText(text_body, 'plain')161 msg.attach(text_part)162 # TLS if needed163 if dbconfig.get('smtp_tls', NS_EMAIL, False):164 smtp.starttls()165 # Login if needed166 username = dbconfig.get('smtp_username', NS_EMAIL)167 password = dbconfig.get('smtp_password', NS_EMAIL)168 if username and password:169 smtp.login(username, password)170 smtp.sendmail(self.sender, recipients, msg.as_string())171 smtp.quit()172@deprecated('send_email has been deprecated, use cloud_inquisitor.utils.send_notifications instead')173def send_email(subsystem, sender, recipients, subject, html_body=None, text_body=None, message_uuid=None):174 """Send an email to a list of recipients using the system configured email method (SES or SMTP)175 Args:176 subsystem (str): Name of the subsystem where the email originated from177 sender (str): From email address178 recipients (`list` of `str`): List of recipient email addresses179 subject (str): Subject of the email180 html_body (str): HTML body of the email181 text_body (str): Text body of the email182 message_uuid (str): Optional UUID message identifier. If not provided one will be generated automatically183 Returns:184 `None`185 """186 if type(recipients) == str:187 recipients = [recipients]188 recipients = list(set(recipients))189 send_notification(190 subsystem=subsystem,191 recipients=[NotificationContact('email', x) for x in recipients],192 subject=subject,193 body_html=html_body,194 body_text=text_body...

Full Screen

Full Screen

tagging_service.py

Source:tagging_service.py Github

copy

Full Screen

1class TaggingService:2 def __init__(self, tagName="Tags", keyName="Key", valueName="Value"):3 self.tagName = tagName4 self.keyName = keyName5 self.valueName = valueName6 self.tags = {}7 def get_tag_dict_for_resource(self, arn):8 result = {}9 if self.has_tags(arn):10 for k, v in self.tags[arn].items():11 result[k] = v12 return result13 def list_tags_for_resource(self, arn):14 result = []15 if self.has_tags(arn):16 for k, v in self.tags[arn].items():17 result.append({self.keyName: k, self.valueName: v})18 return {self.tagName: result}19 def delete_all_tags_for_resource(self, arn):20 if self.has_tags(arn):21 del self.tags[arn]22 def has_tags(self, arn):23 return arn in self.tags24 def tag_resource(self, arn, tags):25 if arn not in self.tags:26 self.tags[arn] = {}27 for t in tags:28 if self.valueName in t:29 self.tags[arn][t[self.keyName]] = t[self.valueName]30 else:31 self.tags[arn][t[self.keyName]] = None32 def copy_tags(self, from_arn, to_arn):33 if self.has_tags(from_arn):34 self.tag_resource(35 to_arn, self.list_tags_for_resource(from_arn)[self.tagName]36 )37 def untag_resource_using_names(self, arn, tag_names):38 for name in tag_names:39 if name in self.tags.get(arn, {}):40 del self.tags[arn][name]41 def untag_resource_using_tags(self, arn, tags):42 m = self.tags.get(arn, {})43 for t in tags:44 if self.keyName in t:45 if t[self.keyName] in m:46 if self.valueName in t:47 if m[t[self.keyName]] != t[self.valueName]:48 continue49 # If both key and value are provided, match both before deletion50 del m[t[self.keyName]]51 def extract_tag_names(self, tags):52 results = []53 if len(tags) == 0:54 return results55 for tag in tags:56 if self.keyName in tag:57 results.append(tag[self.keyName])58 return results59 def flatten_tag_list(self, tags):60 result = {}61 for t in tags:62 if self.valueName in t:63 result[t[self.keyName]] = t[self.valueName]64 else:65 result[t[self.keyName]] = None...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful