Best Python code snippet using autotest_python
events.py
Source:events.py  
...54    """55    def __init__(self, request):56        self.request = request57        self.sailthru = Sailthru(request)58    def _sendmail(self, template_name, to_email, context):59        """60        Use SailThru to deliver a templated e-mail61        :param template_name: SailThru template name62        :param to_email: string E-mail address63        :param context: Dictionary of parameters for the template64        """65        assert isinstance(template_name, basestring)66        assert isinstance(to_email, basestring)67        assert type(context) == dict68        if self.sailthru.enabled():69            response = self.sailthru.send_template(template_name, to_email, context)70            LOG.info("Sent e-mail to %s (SailThru send ID: %s)",71                     response['email'], response['send_id'])72            return response73        else:74            LOG.warn(75                'Not sending template %s to %s because SailThru disabled',76                template_name, to_email77            )78            return None79    def logged_in(self, user):80        """81        The user has logged in to the site82        :param user: User object83        """84        # Integration with sailthru85        if self.sailthru.enabled():86            self.sailthru.login(user)87        # Internal LifetimeTrack88        try:89            lt = LifetimeTrack.objects.get(user=user)90            merge_lifetime_track(self.request.lifetime_track, lt)91        except Exception as e:92            print "=-"*34, e93        # Integration with mixpanel94        try:95            mixpanel_track(96                self.request,97                "logged in",98                {99                    "$last_seen": datetime.datetime.now()100                }101            )102            mixpanel_engage(self.request, {'$add': {'Times Logged In': 1}})103            try:104                has_stall = user.stall is not None105            except:106                has_stall = False107            if has_stall:108                member_type = 'Regular'109            else:110                member_type = 'Stall Owner'111            mixpanel_engage(self.request, {'$set': {112                'last_login': datetime.datetime.now().isoformat(),113                '$email': user.email,114                '$username': user.username,115                '$created': user.date_joined.isoformat(),116                "$last_seen": datetime.datetime.now().isoformat(),117                '$first_name': user.first_name,118                '$last_name': user.last_name,119                'gender': user.user_profile.get_gender_display(),120                'mp_name_tag': user.get_full_name(),121                'Member Type': member_type,122            }})123        except:124            LOG.warn('Could not send login event to MixPanel', exc_info=True)125    def user_changed_email(self, user, old_email, new_email):126        """127        The user has changed their e-mail address128        :param user: User object129        :param old_email: Old e-mail address130        :param new_email: New e-mail address131        """132        from mailing_lists.models import MailingListSignup133        self.sailthru.change_email(old_email, new_email)134        old_mls = MailingListSignup.objects.filter(user=user)135        # Multiple mailing list signup objects for the same user136        # This shouldn't happen, but we can safely clean up.137        if len(old_mls) > 1:138            MailingListSignup.objects.filter(user=user).exclude(email_address=old_email).delete()139            old_mls = MailingListSignup.objects.filter(user=user)        140        if MailingListSignup.objects.filter(email_address=new_email).count() != 0:141            LOG.warn('Cant update MailingListSignup from %s to %s '142                     'because MLS with new email already exists', old_email, new_email)143            return False   144        if len(old_mls) == 1:     145            old_mls[0].email_address = new_email146            old_mls[0].save()147    def cart_updated(self, cart):148        """149        Syncs contents of cart with SailThru and updates MixPanel properties150        for # of items in cart.151        :param cart: Cart object152        """153        try:154            self.sailthru.cart_updated(cart)155            mixpanel_engage(self.request, {'$set': {156                'Items in Cart': cart.num_items()157            }})158        except:159            LOG.error("cart_updated failed", exc_info=True)160    def user_followed(self, follow):161        """162        :param follow: UserFollow object which has just been created163        """164        mixpanel_track(self.request,165                       "Followed User",166                       {"followed_user": follow.target.username})167        mixpanel_engage(self.request,168                        get_mixpanel_info(follow.user),169                        follow.user.id)170        mixpanel_engage(self.request,171                        get_mixpanel_info(follow.target, ignore_time=True),172                        follow.target.id)173        template_name = 'new-follower-notification'174        context = {175            'USER_USERNAME': follow.user.username,176            'USER_PROFILE_URL': absolute_uri(follow.user.get_profile().get_absolute_url()),177            'TARGET_USER_USERNAME': follow.target.username,178            'TARGET_USER_PROFILE_URL': absolute_uri(follow.target.get_profile().get_absolute_url())179        }180        print absolute_uri(follow.user.get_profile().get_absolute_url()), \181            absolute_uri(follow.target.get_profile().get_absolute_url())182        email_notification = follow.target.email_notification183        if email_notification.follower_notifications:184            self._sendmail(template_name, follow.target.email, context)185        else:186            LOG.info(187                "Email not send to %s, since follower_notifications is set to OFF",188                follow.target.email189            )190    def user_unfollowed(self, unfollow):191        """192        :param unfollow: UserFollow object which is about to be deleted193        """194        mixpanel_track(self.request,195                       "Unfollowed User",196                       {"followed_user": unfollow.target.username})197        mixpanel_engage(self.request,198                        get_mixpanel_info(unfollow.user),199                        unfollow.user.id)200        mixpanel_engage(self.request,201                        get_mixpanel_info(unfollow.target, ignore_time=True),202                        unfollow.target.id)203    def message_sent(self, msg):204        """205        A new message has been sent, notify the recipient via e-mail206        :param msg: Message object207        """208        if not msg.parent_msg:209            template_name = "new-message-received"210        else:211            template_name = "message-reply-received"212        # TODO: notification settings213        context = {214            "SENDER_USERNAME": msg.sender.username,215            "MESSAGE_SUBJECT": msg.subject,216            "MESSAGE_BODY": msg.body,217            'MESSAGE_VIEW_URL': absolute_uri(reverse('messaging_inbox'))218        }219        # send mail to hipchat220        template_context = Context({221            'request': self.request,222            'message': msg,223        })224        template = loader.get_template("messaging/fragments/hipchat_message.html")225        output = template.render(template_context)226        send_to_hipchat(227            output,228            room_id=settings.HIPCHAT_MAIL_ROOM,229            notify=1,230        )231        mixpanel_track(self.request, "Sent Message", {})232        mixpanel_engage(self.request, {'$add': {'Messages Sent': 1}})233        self._sendmail(template_name, msg.recipient.email, context)234    def seller_paypal_error(self, payment_attempt):235        """236        The payment attempt failed because the Seller has an invalid Paypal237        address238        :param payment_attempt: PaymentAttempt object239        """240        try:241            stall = payment_attempt.cart_stall.stall242            stall_url = reverse("my_stall", kwargs={"slug": stall.slug})243            settings_url = reverse("account_email_notifications")244            context = {245                'stall': {246                    'title': stall.title,247                    'url': absolute_uri(stall_url),248                },249                'PAYPAL_SETTINGS_URL': absolute_uri(settings_url)250            }251            to_email = stall.user.email252            self._sendmail("seller-erroneous-paypal-account",253                           to_email, context)254        except:255            LOG.error('seller_paypal_error failed', exc_info=True)256    def _order_item_context(self, item):257        """258        XXX: move somewhere else?259        """260        product = item.product261        return {262            'qty': item.quantity,263            'title': product.title,264            'price': str(item.price),265            'total': str(item.price * item.quantity),266            'id': product.id,267            'url': absolute_uri(product.get_absolute_url())268        }269    def _order_context(self, order):270        """271        XXX: move somewhere else?272        """273        items = [self._order_item_context(line_item)274                 for line_item in order.line_items.all()]                275        address = order.address276        delta = datetime.datetime.now().date() - order.created277        context = {278            'ORDER': {279                'items': items,280                'subtotal': str(order.subtotal()),281                'shipping': str(order.delivery_charge),282                'total': str(order.total()),283                'note': order.note,284                'address': {285                    'name': address.name,286                    'city': address.city,287                    'country': address.country.title,288                    'line1': address.line1,289                    'line2': address.line2,290                    'postal_code': address.postal_code,291                    'state': address.state,292                },293            },294            "FNAME": order.user.first_name,295            "ORDER_DATE": custom_strftime('{S} %B %Y', order.created),296            "CUSTOMER_FIRST_NAME": order.user.first_name,297            "CUSTOMER_FULL_NAME": order.user.get_profile().full_name,298            "CUSTOMER_USERNAME": order.user.username,299            "CUSTOMER_PROFILE_URL": absolute_uri(300                reverse("public_profile",301                        kwargs={"username": order.user.username})),302            "ORDER_ID": order.id,303            "INVOICE_URL": absolute_uri(reverse("invoice",304                                        kwargs={"order_id": order.id})),305            "STALL_TITLE": order.stall.title,306            "STALL_URL": absolute_uri(reverse("my_stall",307                                      kwargs={"slug": order.stall.slug})),308            "STALL_OWNER_USERNAME": order.stall.user.username,309            'STALL_OWNER_URL': order.stall.user.get_profile().get_absolute_url(),310            "NUMBER_OF_DAYS_SINCE_ORDER": delta.days,311            "MESSAGE_CUSTOMER_URL": absolute_uri(312                reverse("messaging_compose_to", kwargs={313                    "recipient": order.user.username,314            })),315            "MESSAGE_STALL_URL": absolute_uri(316                reverse("messaging_compose_to", kwargs={317                    "recipient": order.stall.user.username,318                })319            ),320            "UPDATE_PROFILE": absolute_uri(reverse("account_email_notifications")),321        }322        return context323    def _mixpanel_order_info(self, order):324        order_info = {325            "Order ID": order.id,326            "Order Date": order.created.isoformat(),327            "Total Order Value": str(order.total().amount),328            "Total Shipping Value": str(order.delivery_charge.amount),329            "No of Products": order.line_items.all().count(),330            "No of Items": order.num_items(),331            "Delivery Country": order.address.country.title332        }333        return order_info334    def order_reminder(self, order):335        """336        The seller needs to be reminded about an order.337        """338        days_map = {339            3: "3-day-dispatch-chase-up-stall-owner",340            7: "7-day-dispatch-chase-up-stall-owner",341            13: "13-day-dispatch-chase-up-stall-owner",342            14: "14-day-dispatch-chase-up-and-refund-warning",343        }344        days = (datetime.datetime.now().date() - order.created).days345        allowed_days = days_map.keys()346        if days not in allowed_days:347            return348        context = self._order_context(order)349        self._sendmail(days_map[days], order.stall.user.email, context)350    def order_refunded(self, order):351        """352        The seller has refunded an order353        """354        context = self._order_context(order)355        self._sendmail(356            'order-refunded-to-customer',357            order.user.email, context358        )359        self._sendmail(360            'order-refunded-to-stall-owner',361            order.stall.user.email,362            context363        )364        try:365            # Track the Seller refunding the Order366            order_info = self._mixpanel_order_info(order)367            mixpanel_track(self.request, "Clicked Refund Button", order_info)368            mixpanel_engage(self.request, {369                '$add': {370                    'Refunds Made': 1371                }372            })373            # Update the Buyers properties, reducing their GMV374            # XXX: we can't delete the transaction after it's refunded!!!375            #      adding the transaction needs to happen on Dispatch376            user = order.user377            stall = order.stall378            mixpanel_engage(None, {379                '$set': {380                    'Orders': user.orders.completed().count(),381                    'Total GMV to Date': str(user.get_profile().total_gmv.amount),382                },383                '$ignore_time': True384            }, distinct_id=user.id)385        except: 386            LOG.warning("Couldn't notify MixPanel of refund", exc_info=True)387    def order_dispatched(self, order):388        """389        The seller has marked an order as dispatched390        """391        try:392            context = self._order_context(order)393            self._sendmail(394                'order-dispatched-to-customer',395                order.user.email,396                context397            )398        except:399            LOG.warning("Couldn't send Order Dispatched email to Customer", exc_info=True)400        try:401            order_info = self._mixpanel_order_info(order)402            mixpanel_track(self.request, 'Clicked Mark Order as Dispatched', order_info)403            mixpanel_engage(self.request, {404                '$add': {405                    'Orders Dispatched': 1406                }407            }, distinct_id=order.stall.user.id)408        except:409            LOG.warning("Couldn't update MixPanel after Order Dispatch", exc_info=True)410    def order_placed(self, order):411        """412         1) Syncs cart with SailThru413         2) Sends 'Order Completed' e-mail to customer via Sailthru414         3) Syncs order information with MixPanel415        """416        # Technically the transaction isn't complete yet because the seller hasn't417        # marked the order as complete so could potentially refund the money.418        try:419            context = self._order_context(order)420            self._sendmail(421                'order-placed-to-customer',422                order.user.email,423                context424            )425            self._sendmail(426                'order-placed-to-stall-owner',427                order.stall.user.email,428                context429            )430        except:431            LOG.error("Events.order_placed failed to send email", exc_info=True)432        # Send completed order to SailThru, this updates the users 'Total Revenue'433        try:434            self.sailthru.order_purchased(order)435        except:436            LOG.error("Events.order_placed failed to sync order with SailThru", exc_info=True)437        # Sync user properties with MixPanel438        try:439            user = order.user440            mixpanel_engage(self.request, {441                '$set': {442                    'Orders': user.orders.completed().count(),443                    'Total GMV to Date': str(user.get_profile().total_gmv.amount),444                }445            }, distinct_id=user.id)446            mixpanel_engage(self.request, {447                '$append': {448                    '$transactions': {449                        '$time': order.created.isoformat(),450                        '$amount': str(order.total().amount),451                        'Order ID': order.id452                    }453                }454            }, distinct_id=user.id)455            if self.request is not None:456                order_info = self._mixpanel_order_info(order)457                mixpanel_track(self.request, "Purchased Order", order_info)458        except: 459            LOG.warning("Couldn't notify MixPanel of new order", exc_info=True)460    def forgot_password(self, user, token_generator=None):461        """462        Sends 'Forgot Username or Password?' with their username and a link463        to the password reset form.464        """465        from django.utils.http import int_to_base36466        if token_generator is None:467            from django.contrib.auth.tokens import default_token_generator      468            token_generator = default_token_generator469        token = token_generator.make_token(user)470        uid = int_to_base36(user.id)471        ctx = dict(472            PASSWORD_RESET_URL=absolute_uri(473                reverse("password_reset_confirm", kwargs={474                    "uidb36": uid,475                    "token": token476                })477            )478        )479        self._sendmail('forgot-username-or-password', user.email, ctx)480    def stall_opened(self, stall):481        """482        A new stall has been opened, welcome the user to the site and give them483        information about how to add products & optimize their stuff.484        """485        user = stall.user486        context = {487            "STALL_URL": absolute_uri(reverse("my_stall", kwargs={488                "slug": stall.slug,489            }))490        }491        self._sendmail("stall-owner-welcome", user.email, context)492    def user_signup(self, user, requires_activation=True):493        """494        Welcomes the user to the site495        Includes URL for them to activate their account / verify their e-mail.496        """497        if requires_activation:498            user_profile = user.user_profile499            verify_url = absolute_uri(reverse("verify", args=[user_profile.activation_key]))500            ctx = dict(ACTIVATION_URL=verify_url)501            self._sendmail('regular-user-welcome-email', user.email, ctx)502        profile = user.get_profile()503        # Fix up their MailingLists objects504        from mailing_lists.models import MailingListSignup505        mls = MailingListSignup.objects.create_from_user(user)506        mls.marketing_optin = profile.send_newsletters507        if self.request is not None:508            mls.set_ip_address(self.request)509        mls.save()510        # Apply defaults to EmailNotification preferences511        email_notification = user.email_notification512        email_notification.site_updates_features = profile.send_newsletters513        email_notification.stall_owner_tips = True514        email_notification.product_inspirations = profile.send_newsletters515        email_notification.blogs_you_might_like = profile.send_newsletters516        email_notification.save()517        # Integration with sailthru518        if self.sailthru.enabled():519            self.sailthru.signup(user=user)520    def newsletter_signup(self, email):521        """522        This is a newsletter signup that is available from the homepage modal,523        and at the bottom of the blog.524        """525        # Integration with sailthru526        # XXX: we should be doing the mailinglistsignup thing here too!!!527        if self.sailthru.enabled():528            self.sailthru.signup(email=email)529    def stall_stockcheck(self, stall):530        user = stall.user531        context = {532            "STALL_URL": absolute_uri(reverse("my_stall", kwargs={533                "slug": stall.slug,534            })),535            "DAYS_LEFT": stall.days_to_next_stockcheck536        }537        self._sendmail("stock-check-reminder-1", user.email, context)538    def stall_stockcheck_urgent(self, stall):539        user = stall.user540        context = {541            "STALL_URL": absolute_uri(reverse("my_stall", kwargs={542                "slug": stall.slug,543            })),544            "DAYS_LEFT": stall.days_to_next_stockcheck545        }546        self._sendmail("stock-check-reminder-2", user.email, context)547    def stall_suspended(self, stall):548        user = stall.user549        context = {550            "STALL_URL": absolute_uri(reverse("my_stall", kwargs={551                "slug": stall.slug,552            }))553        }...test_mail.py
Source:test_mail.py  
1# -*- coding: utf-8 -*-2# Copyright 2019 Open End AB3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8#    http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15try:16    import ConfigParser                 #py217except ImportError:18    import configparser as ConfigParser #py319    20import dkim21import email.message22import os23from accounting import config24from .. import mail, smtppipe25import codecs26try:27    unicode                     #py228    py23txt = lambda t, c = 'us-ascii': t29    py23txtc = lambda t, c = 'us-ascii': t.encode(c)30    py23txtu = lambda t, c = 'us-ascii': unicode(t, c)31except NameError:               #py332    py23txt = lambda t, c = 'us-ascii': str(t, c)33    py23txtc = lambda t, c = 'us-ascii': bytes(t,c)34    py23txtu = lambda t, c = 'us-ascii': t35class SMTP(object):36    local_hostname = 'test'37    _quit = False38    def __init__(self, *args, **kw):39        self._args = args40        self._kw = kw41        self._sendmail = []42    def sendmail(self, *args):43        self._sendmail.append(args)44    def quit(self):45        self._quit = True46class TestMail(object):47    def setup_method(self, method):48        self._smtp = smtppipe.SMTP49        smtppipe.SMTP = self.SMTP50        self.smtps = []51        self.orig_config = config.config52        self.config = config.save()53        config.config.set('accounting', 'smtp_domain', 'example.com')54    def teardown_method(self, method):55        smtppipe.SMTP = self._smtp56        config.restore(self.config)57    def SMTP(self, *args, **kw):58        smtp = SMTP(*args, **kw)59        self.smtps.append(smtp)60        return smtp61    def test_sendmail_dkim(self):62        config.config.set('accounting', 'dkim_privkey', os.path.join(63            os.path.dirname(__file__), 'dkim_test.private'))64        config.config.set('accounting', 'dkim_domain', 'example.org')65        config.config.set('accounting', 'smtp_domain', 'test.example.org')66        config.config.set('accounting', 'smtp_to_filter', '.*@example')67        pubkey = open(os.path.join(68            os.path.dirname(__file__), 'dkim_test.txt'),'rb').read()69        body = u'räksmörgÃ¥s'70        subject = u'RäksmörgÃ¥sar!'71        to = u'"Mr. RäksmörgÃ¥s" <foo@example>'72        fromaddr, all_rcpts, message = mail.makemail(73            body, subject=subject, to=to)74        mail.sendmail(fromaddr, all_rcpts, message)75        smtp, = self.smtps76        message = smtp._sendmail[0][2]77        assert dkim.verify(message, dnsfunc=lambda *_: pubkey)78        assert b'i=@test.example.org' in message79        fromaddr, all_rcpts, message = mail.makemail(80            body, subject=subject, to=to)81        mail.sendmail(fromaddr, all_rcpts, message, identity='foo')82        smtp = self.smtps[1]83        message = smtp._sendmail[0][2]84        assert dkim.verify(message, dnsfunc=lambda *_: pubkey)85        assert b'i=foo@test.example.org' in message86    def test_makemail(self):87        body = u'räksmörgÃ¥s'88        subject = u'RäksmörgÃ¥sar!'89        to = u'"Mr. RäksmörgÃ¥s" <foo@example>'90        args = mail.makemail(body, subject=subject, to=to)91        assert len(args) == 392        assert args[:2] == ('<>', ['foo@example'])93        msg = args[2]94        assert 'foo@example' in msg95        # subject utf-8 and base64 encoded96        assert '=?utf-8?b?UsOka3Ntw7ZyZ8Olc2FyIQ==?=' in msg97        #assert msg.strip().endswith(py23txtc(body, 'utf-8').encode('base64').strip())98        assert msg.strip().endswith(py23txt(codecs.encode(py23txtc(body, 'utf-8'), 'base64').strip()))99    def test_makemail_bcc(self):100        body = u'räksmörgÃ¥s'101        subject = u'RäksmörgÃ¥sar!'102        bcc = u'"Mr. RäksmörgÃ¥s" <foo@example>'103        args = mail.makemail(body, envfrom='some@place',104                             subject=subject, bcc=bcc)105        assert len(args) == 3106        assert args[:2] == ('some@place', ['foo@example'])107        msg = args[2]108        assert 'foo@example' not in msg109        assert 'Bcc' in msg110    def test_makemail_envfrom(self):111        body = u'räksmörgÃ¥s'112        subject = u'RäksmörgÃ¥sar!'113        to = u'"Mr. RäksmörgÃ¥s" <foo@example>'114        _from = u'test@other'115        args = mail.makemail(body, envfrom='some@place', _from=_from,116                             subject=subject, to=to)117        assert len(args) == 3118        assert args[:2] == ('some@place', ['foo@example'])119        msg = args[2]120        #import pdb;pdb.set_trace()121        assert 'From: <test@other>' in msg122        #assert 'From:  <test@other>' in msg123    def test_address_filtering(self, monkeypatch):124        config.config.set('accounting', 'smtp_to_filter', '')125        log = []126        monkeypatch.setattr(mail.log, 'warn', lambda *args: log.append(args))127        mail.sendmail('from@test', ['to@test'], 'mail')128        assert self.smtps == []129        assert 'to@test' in log[0]130        log[:] = []131        config.config.set('accounting', 'smtp_to_filter', '.*@openend.se')132        mail.sendmail('from@test', ['to@test'], 'mail')133        assert self.smtps == []134        assert 'to@test' in log[0]135        log[:] = []136        msg = email.message.Message()137        msg['from'] = 'foo@test'138        config.config.set('accounting', 'smtp_to_filter', '.*')139        # xxx hack to make dkim signing work, since the dkim key is for140        # openend.se141        config.config.set('accounting', 'smtp_domain', 'admin.eutaxia.eu')142        mail.sendmail('from@test', ['to@test'], msg.as_string())143        args, = self.smtps[0]._sendmail144        assert args[:2] == ('from@test', ['to@test'])145        assert log == []146    def test_fromaddr_domain(self):147        config.config.set('accounting', 'dkim_privkey', os.path.join(148            os.path.dirname(__file__), 'dkim_test.private'))149        config.config.set('accounting', 'dkim_domain', 'example.org')150        config.config.set('accounting', 'smtp_domain', 'test.example.org')151        config.config.set('accounting', 'smtp_to_filter', '.*')152        msg = email.message.Message()153        msg['from'] = 'foo@test'154        mail.sendmail('from', ['to@test'], msg.as_string())155        args, = self.smtps[-1]._sendmail156        assert args[:2] == ('from@test.example.org', ['to@test'])157        msg = email.message.Message()158        msg['from'] = 'foo@test'159        mail.sendmail('from@bar', ['to@test'], msg.as_string())160        args, = self.smtps[-1]._sendmail161        assert args[:2] == ('from@bar', ['to@test'])162        msg = email.message.Message()163        msg['from'] = 'foo@test'164        mail.sendmail('<>', ['to@test'], msg.as_string())165        args, = self.smtps[-1]._sendmail166        assert args[:2] == ('<>', ['to@test'])167def test_makeAddressHeader():168    addrs = [(u'kalle anka', 'kalle@anka.se'),169             (u'foo@bar', 'foo@bar.com'),170             (u'<>foo,bar', 'foobar@baz.com'),171             (u'räksmörgÃ¥s', 'raksmorgas@mat.se'),172             (u'é»å', 'dentaku@foo.org')]173    tohdr = mail.makeAddressHeader('To', addrs)174    try:175        unicode176        assert tohdr == 'kalle anka <kalle@anka.se>, "foo@bar" <foo@bar.com>, "<>foo,bar" <foobar@baz.com>,' \177                    ' =?iso-8859-1?q?r=E4ksm=F6rg=E5s?= <raksmorgas@mat.se>, =?utf-8?b?6Zu75Y2T?= <dentaku@foo.org>'178    except NameError:179        assert tohdr == 'kalle anka <kalle@anka.se>, "foo@bar" <foo@bar.com>, "<>foo,bar" <foobar@baz.com>,' \...sendmail.py
Source:sendmail.py  
1import smtplib2import hashlib3import config4import time, queue, threading, logging5log = logging.getLogger(__name__)6#rate limiting : hash a message and put it in a dictionaty (key).  Value is a combination7#of number-of-times-message-is-tried-to-be-sent and a starttime.8#A message is sent once, then blocked for a number of times xx or until a certain time passed9_messages = {}10_messageQueue = queue.Queue()11#base class12class Message:13	pass14	15#class used to transmit message to worker thread16class MessageSend(Message):17	def __init__(self, subject, body):18		self.subject = subject19		self.body = body20	21#class used to transmit flush-command to worker thread22class MessageFlush(Message):23	pass24def start():25	log.info("starting")26	_mainThread = threading.Thread(27		target = mailWorker,28		args = (_messageQueue, ))29	_mainThread.setDaemon(True)30	_mainThread.start()31	32	_flushThread = threading.Thread(target = flushWorker)33	_flushThread.setDaemon(True)34	_flushThread.start()35#flushes the dictionary on regular intervals to avoid a too large dictionary		36def flushWorker():37	while True:38		time.sleep(config.SM_WINDOW + 5)39		flush()40		log.info('flush message cache')41class MessageLimiting:42	def __init__(self):43		self.time = int(time.time())44		self.messageCount = 145	46def send(subject, body):47	m = MessageSend(subject, body)48	_messageQueue.put(m)49	50def flush():51	m = MessageFlush()52	_messageQueue.put(m)53	54	55def mailWorker(q):56	global _messages57	while True:58		m = q.get()59		if isinstance(m, MessageSend):60			_send(m.subject, m.body)61		elif isinstance(m, MessageFlush):62			_messages = {}63	64def _sendMail(m, mc):65	message = '{}\n\nMessage #{}\n\nGreetings HA'.format(m, mc)66	log.error(message)67	if not config.SM_ENABLE_SEND_MAIL:68		return69	server = smtplib.SMTP('smtp.gmail.com', 587)70	server.starttls()71	server.login("automatisatie.borowski@gmail.com", "AuGoo20ToGle16Matisatie")72	server.sendmail("automatisatie.borowski@gmail.com", "emmanuel.borowski@gmail.com", message)73	server.quit()74def _send(subject, body):75	if subject == '': subject = 'heating automation message'76	if body == '' : body = 'heating automation default message'77	 78	msg = 'Subject: {}\n\n{}'.format(subject, body)79	b = bytearray()80	b.extend(msg.encode())81	hmsg = hashlib.sha1(b).hexdigest()82	if hmsg in _messages:83		m = _messages[hmsg]84		if m.messageCount < 3:85			_sendMail(msg, m.messageCount)86		else:87			if (int(time.time()) - int(m.time)) > config.SM_WINDOW:88				_sendMail(msg, m.messageCount)89				m.time = time.time()90		m.messageCount += 191	else:92		_messages[hmsg] = MessageLimiting()93		m = _messages[hmsg]94		_sendMail(msg, m.messageCount)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
