Best Python code snippet using localstack_python
email.py
Source:email.py  
1"""Email based notification system"""2import re3import smtplib4import uuid5from datetime import datetime6from email.mime.multipart import MIMEMultipart7from email.mime.text import MIMEText8from cloud_inquisitor import get_local_aws_session9from cloud_inquisitor.config import dbconfig, ConfigOption10from cloud_inquisitor.constants import NS_EMAIL, RGX_EMAIL_VALIDATION_PATTERN11from cloud_inquisitor.database import db12from cloud_inquisitor.exceptions import EmailSendError13from cloud_inquisitor.plugins.notifiers import BaseNotifier14from cloud_inquisitor.schema import Email15from cloud_inquisitor.utils import send_notification, NotificationContact, deprecated16class EmailNotifier(BaseNotifier):17    name = 'Email Notifier'18    ns = NS_EMAIL19    notifier_type = 'email'20    validation = RGX_EMAIL_VALIDATION_PATTERN21    options = (22        ConfigOption('enabled', True, 'bool', 'Enable the Email notifier plugin'),23        ConfigOption('from_address', 'changeme@domain.tld', 'string', 'Sender address for emails'),24        ConfigOption('method', 'ses', 'string', 'EMail sending method, either ses or smtp'),25        ConfigOption(26            'from_arn', '', 'string',27            'If using cross-account SES, this is the "From ARN", otherwise leave blank'28        ),29        ConfigOption(30            'return_path_arn', '', 'string',31            'If using cross-account SES, this is the "Return Path ARN", otherwise leave blank'32        ),33        ConfigOption(34            'source_arn', '', 'string',35            'If using cross-account SES, this is the "Source ARN", otherwise leave blank'36        ),37        ConfigOption('ses_region', 'us-west-2', 'string', 'Which SES region to send emails from'),38        ConfigOption('smtp_server', 'localhost', 'string', 'Address of the SMTP server to use'),39        ConfigOption('smtp_port', 25, 'int', 'Port for the SMTP server'),40        ConfigOption(41            'smtp_username', '', 'string',42            'Username for SMTP authentication. Leave blank for no authentication'43        ),44        ConfigOption(45            'smtp_password', '', 'string',46            'Password for SMTP authentication. Leave blank for no authentication'47        ),48        ConfigOption('smtp_tls', False, 'bool', 'Use TLS for sending emails'),49    )50    def __init__(self):51        super().__init__()52        self.sender = self.dbconfig.get('from_address', NS_EMAIL)53    def notify(self, subsystem, recipient, subject, body_html, body_text):54        """Method to send a notification. A plugin may use only part of the information, but all fields are required.55        Args:56            subsystem (`str`): Name of the subsystem originating the notification57            recipient (`str`): Recipient email address58            subject (`str`): Subject / title of the notification59            body_html (`str)`: HTML formatted version of the message60            body_text (`str`): Text formatted version of the message61        Returns:62            `None`63        """64        if not re.match(RGX_EMAIL_VALIDATION_PATTERN, recipient, re.I):65            raise ValueError('Invalid recipient provided')66        email = Email()67        email.timestamp = datetime.now()68        email.subsystem = subsystem69        email.sender = self.sender70        email.recipients = recipient71        email.subject = subject72        email.uuid = uuid.uuid4()73        email.message_html = body_html74        email.message_text = body_text75        method = dbconfig.get('method', NS_EMAIL, 'ses')76        try:77            if method == 'ses':78                self.__send_ses_email([recipient], subject, body_html, body_text)79            elif method == 'smtp':80                self.__send_smtp_email([recipient], subject, body_html, body_text)81            else:82                raise ValueError('Invalid email method: {}'.format(method))83            db.session.add(email)84            db.session.commit()85        except Exception as ex:86            raise EmailSendError(ex)87    def __send_ses_email(self, recipients, subject, body_html, body_text):88        """Send an email using SES89        Args:90            recipients (`1ist` of `str`): List of recipient email addresses91            subject (str): Subject of the email92            body_html (str): HTML body of the email93            body_text (str): Text body of the email94        Returns:95            `None`96        """97        source_arn = dbconfig.get('source_arn', NS_EMAIL)98        return_arn = dbconfig.get('return_path_arn', NS_EMAIL)99        session = get_local_aws_session()100        ses = session.client('ses', region_name=dbconfig.get('ses_region', NS_EMAIL, 'us-west-2'))101        body = {}102        if body_html:103            body['Html'] = {104                'Data': body_html105            }106        if body_text:107            body['Text'] = {108                'Data': body_text109            }110        ses_options = {111            'Source': self.sender,112            'Destination': {113                'ToAddresses': recipients114            },115            'Message': {116                'Subject': {117                    'Data': subject118                },119                'Body': body120            }121        }122        # Set SES options if needed123        if source_arn and return_arn:124            ses_options.update({125                'SourceArn': source_arn,126                'ReturnPathArn': return_arn127            })128        ses.send_email(**ses_options)129    def __send_smtp_email(self, recipients, subject, html_body, text_body):130        """Send an email using SMTP131        Args:132            recipients (`list` of `str`): List of recipient email addresses133            subject (str): Subject of the email134            html_body (str): HTML body of the email135            text_body (str): Text body of the email136        Returns:137            `None`138        """139        smtp = smtplib.SMTP(140            dbconfig.get('smtp_server', NS_EMAIL, 'localhost'),141            dbconfig.get('smtp_port', NS_EMAIL, 25)142        )143        source_arn = dbconfig.get('source_arn', NS_EMAIL)144        return_arn = dbconfig.get('return_path_arn', NS_EMAIL)145        from_arn = dbconfig.get('from_arn', NS_EMAIL)146        msg = MIMEMultipart('alternative')147        # Set SES options if needed148        if source_arn and from_arn and return_arn:149            msg['X-SES-SOURCE-ARN'] = source_arn150            msg['X-SES-FROM-ARN'] = from_arn151            msg['X-SES-RETURN-PATH-ARN'] = return_arn152        msg['Subject'] = subject153        msg['To'] = ','.join(recipients)154        msg['From'] = self.sender155        # Check body types to avoid exceptions156        if html_body:157            html_part = MIMEText(html_body, 'html')158            msg.attach(html_part)159        if text_body:160            text_part = MIMEText(text_body, 'plain')161            msg.attach(text_part)162        # TLS if needed163        if dbconfig.get('smtp_tls', NS_EMAIL, False):164            smtp.starttls()165        # Login if needed166        username = dbconfig.get('smtp_username', NS_EMAIL)167        password = dbconfig.get('smtp_password', NS_EMAIL)168        if username and password:169            smtp.login(username, password)170        smtp.sendmail(self.sender, recipients, msg.as_string())171        smtp.quit()172@deprecated('send_email has been deprecated, use cloud_inquisitor.utils.send_notifications instead')173def send_email(subsystem, sender, recipients, subject, html_body=None, text_body=None, message_uuid=None):174    """Send an email to a list of recipients using the system configured email method (SES or SMTP)175    Args:176        subsystem (str): Name of the subsystem where the email originated from177        sender (str): From email address178        recipients (`list` of `str`): List of recipient email addresses179        subject (str): Subject of the email180        html_body (str): HTML body of the email181        text_body (str): Text body of the email182        message_uuid (str): Optional UUID message identifier. If not provided one will be generated automatically183    Returns:184        `None`185    """186    if type(recipients) == str:187        recipients = [recipients]188    recipients = list(set(recipients))189    send_notification(190        subsystem=subsystem,191        recipients=[NotificationContact('email', x) for x in recipients],192        subject=subject,193        body_html=html_body,194        body_text=text_body...tagging_service.py
Source:tagging_service.py  
1class TaggingService:2    def __init__(self, tagName="Tags", keyName="Key", valueName="Value"):3        self.tagName = tagName4        self.keyName = keyName5        self.valueName = valueName6        self.tags = {}7    def get_tag_dict_for_resource(self, arn):8        result = {}9        if self.has_tags(arn):10            for k, v in self.tags[arn].items():11                result[k] = v12        return result13    def list_tags_for_resource(self, arn):14        result = []15        if self.has_tags(arn):16            for k, v in self.tags[arn].items():17                result.append({self.keyName: k, self.valueName: v})18        return {self.tagName: result}19    def delete_all_tags_for_resource(self, arn):20        if self.has_tags(arn):21            del self.tags[arn]22    def has_tags(self, arn):23        return arn in self.tags24    def tag_resource(self, arn, tags):25        if arn not in self.tags:26            self.tags[arn] = {}27        for t in tags:28            if self.valueName in t:29                self.tags[arn][t[self.keyName]] = t[self.valueName]30            else:31                self.tags[arn][t[self.keyName]] = None32    def copy_tags(self, from_arn, to_arn):33        if self.has_tags(from_arn):34            self.tag_resource(35                to_arn, self.list_tags_for_resource(from_arn)[self.tagName]36            )37    def untag_resource_using_names(self, arn, tag_names):38        for name in tag_names:39            if name in self.tags.get(arn, {}):40                del self.tags[arn][name]41    def untag_resource_using_tags(self, arn, tags):42        m = self.tags.get(arn, {})43        for t in tags:44            if self.keyName in t:45                if t[self.keyName] in m:46                    if self.valueName in t:47                        if m[t[self.keyName]] != t[self.valueName]:48                            continue49                    # If both key and value are provided, match both before deletion50                    del m[t[self.keyName]]51    def extract_tag_names(self, tags):52        results = []53        if len(tags) == 0:54            return results55        for tag in tags:56            if self.keyName in tag:57                results.append(tag[self.keyName])58        return results59    def flatten_tag_list(self, tags):60        result = {}61        for t in tags:62            if self.valueName in t:63                result[t[self.keyName]] = t[self.valueName]64            else:65                result[t[self.keyName]] = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
