Best Python code snippet using autotest_python
discord_client.py
Source:discord_client.py  
1import discord2import os3import json4from discord.ext import tasks5try:6    from crawler.data.database import Database7except:8    from data.database import Database9class DiscordClient(discord.Client):10    def __init__(self, *args, **kwargs):11        super().__init__(*args, **kwargs)        12        # start the task to run in the background                             13        self.database = Database() 14        self.database.avisar_todos()15        self.first_time = self.database.isEmpty()16        try:17            self.channels = json.loads(self.database.get_canais()[0][0].replace("'",'"'))18        except:19            self.channels = {}20        self.my_background_task.start()21    22    async def show_channels(self, adm_channel):23        send_to = self.get_channel(int(adm_channel))24        pretty_dict_str = json.dumps(self.channels, indent=2, sort_keys=True)                        25        await send_to.send(pretty_dict_str)26        return 27    async def delete_by_url(self, url, adm_channel):28        send_to = self.get_channel(int(adm_channel))        29        try:30            url = url.replace('>delete','').strip()31            self.database.delete_by_url(url)            32            await send_to.send("Dados removidos com sucesso!")            33        except Exception as e:                        34            await send_to.send("Erro ao remover os dados! Nada foi perdido.")35        return 36    async def search(self, url, adm_channel):37        send_to = self.get_channel(int(adm_channel))        38        try:39            word = url.replace('>buscar','').strip()            40            results = self.database.search_name(word)41            if len(results) == 0 :42                await send_to.send("Não foram encontrados registros!")43            elif len(results) > 10:44                await send_to.send("Foram encontrados muitos registros!\nPor favor seja um pouco mais especÃfico.")45            else:46                result_list = '\n'.join([name[0] for name in results])47                await send_to.send('Segue resultados da pesquisa:\n{}'.format(result_list))48                49        except Exception as e:50            print(e)                    51            await send_to.send("Erro ao buscar registros!")52        return 53    async def delete_all(self, adm_channel):54        send_to = self.get_channel(int(adm_channel))        55        try:            56            self.database.delete_all()            57            await send_to.send("Dados removidos com sucesso!")            58        except Exception as e:                        59            await send_to.send("Erro ao remover os dados! Nada foi perdido.")60        return 61    async def totais(self, adm_channel):62        send_to = self.get_channel(int(adm_channel))        63        try:            64            results = self.database.totais()   65            result_list = '\n'.join(["{}: {}".format(data[0], data[1]) for data in results])         66            await send_to.send('Segue consolidado:\n{}'.format(result_list))67        except Exception as e:                        68            await send_to.send("Erro ao buscar os dados!")69        return 70    71    async def set_channels(self, channels, adm_channel):72        send_to = self.get_channel(int(adm_channel))73        bk_channels = self.channels74        try:75            config = channels.replace('>configurar','') 76            self.database.configure({"canais" : config})            77            self.channels = json.loads(self.database.get_canais()[0][0].replace("'",'"'))78            await send_to.send("Dados atualizados com sucesso!")            79        except:80            self.database.configure({"canais" : bk_channels})  81            print(self.database.get_canais())82            self.channels = json.loads(self.database.get_canais()[0][0].replace("'",'"'))83            await send_to.send("Erro ao atualizar os dados! Nada foi perdido.")84        return 85    async def on_message(self, message):             86        adm_channel = os.environ.get('ADMIN_CHANNEL')87        if adm_channel == None:88            return89        if int(message.channel.id) != int(adm_channel):           90            return  #leave91        if message.content.startswith('>help'):92            await self.show_help(int(adm_channel))93        if message.content.startswith('>canais'):94            await self.show_channels(int(adm_channel))95        if message.content.startswith('>buscar'):96           await self.search(message.content, int(adm_channel))    97        98        if message.content.startswith('>configurar'):99           await self.set_channels(message.content, int(adm_channel))100        if message.content.startswith('>delete'):101           await self.delete_by_url(message.content, int(adm_channel))102        if message.content.startswith('>truncate'):103           await self.delete_all(int(adm_channel))104        if message.content.startswith('>totais'):105           await self.totais(int(adm_channel))106        return107        108    async def on_ready(self):109        print('Logado...')  110        adm_channel = os.environ.get('ADMIN_CHANNEL')    111        send_to = self.get_channel(int(adm_channel))  112        await send_to.send("Tudo pronto e monitorando novos produtos!")113        114    def create_link(self, data, last):115        if 'tamanho' in data.keys():116            if 'url' in data.keys():117                return '**{}** [**comprar**]({})| '.format(data['tamanho'], data['url']['href'])118            else:119                return '**{}**{} '.format(data['tamanho'], ' e' if last else ',')120        if 'aguardando' in data.keys():121            return '**{}**  '.format(data['aguardando'])122    123    @tasks.loop(seconds=15) # task runs every 15 seconds124    async def my_background_task(self): 125        print(' ============ DISCORD ===============')126        for channel in self.channels:                      127            channel_id = int(self.channels[channel]['canal'])                        128            send_to = self.get_channel(channel_id)129            rows = self.database.avisos(channel)                                130            for row in rows:               131                tamanhos = json.loads(row['tamanhos']) 132                tamanho_desc = ''.join([self.create_link(k, (idt+1) == (len(tamanhos)-1)) for idt, k in enumerate(tamanhos)])[:-2].replace('|','\n')133                message = '{}'.format(row['name'])134                if 'aguardando' in row['tamanhos']:135                    embed = discord.Embed(title=message, url=row['url'], 136                        description=tamanho_desc, color=3066993) #,color=Hex code137                    embed.set_thumbnail(url=row['imagens'].split('|')[0])                                               138                    await send_to.send(embed=embed)139                else:140                    description_text='**Código de estilos: ** {}\n**Preço: ** {}\n\n'.format(row['codigo'],row['price'])141                    tamanho_text='**Tamanhos**\n{}'.format(tamanho_desc)142                    links_text='\n**Links Alternativos**\n' if len(row['outros'][1:3])>0 else ''143                    embed = discord.Embed(title=message, url=row['url'], 144                        description='{}{}{}'.format(description_text,tamanho_text,links_text ), color=3066993) #,color=Hex code        145                    embed.set_thumbnail(url=row['imagens'].split('|')[0])                146                    for idx, outros in enumerate(row['outros'][1:3]):                    147                        embed.add_field(name='Link {}'.format(idx+1), value='[**aqui**]({})'.format(outros), inline=True)                    148                    await send_to.send(embed=embed)149                self.database.avisado(row['id'])  150    @my_background_task.before_loop151    async def before_my_task(self):152        await self.wait_until_ready() # wait until the bot logs in153    async def show_help(self, adm_channel):154        send_to = self.get_channel(int(adm_channel))155        helper = '''156lista de comandos:157>delete \{url\}158>buscar \{palavra\}159>canais160>configurar 161{    162  "artwalk_restock": {163    "canal": 0000000000164  }, 165  "artwalk_lancamentos": {166    "canal": 0000000000167  },   168  "artwalk_calendario": {169    "canal": 0000000000170  },171  "gdlp_lancamentos": {172    "canal": 0000000000173  },174  "gdlp_restock": {175    "canal": 0000000000176  }, 177  "magicfeet_lancamentos": {178    "canal": 0000000000179  },180  "magicfeet_snkrs": {181    "canal": 0000000000182  },183  "maze_lancamentos": {184    "canal": 0000000000185  },186  "maze_restock": {187    "canal": 0000000000188  },189  "maze_snkrs": {190    "canal": 0000000000191  },192  "nike_lancamentos": {193    "canal": 0000000000194  },195  "nike_restock": {196    "canal": 0000000000197  },198  "nike_snkrs": {199    "canal": 0000000000200  }201}202'''                    203        await send_to.send(helper)...email.py
Source:email.py  
1"""2Utility methods for email messaging.3"""4import smtplib5import os6import logging7from email.MIMEMultipart import MIMEMultipart8from email.MIMEBase import MIMEBase9from email.MIMEText import MIMEText10from email.MIMEImage import MIMEImage11from email.Utils import COMMASPACE, formatdate12from email import Encoders13from email.message import Message14def send(send_from, send_to, message, server="server goes here"):15    """Open an SMTP connection, send the message, and close the connection."""16    assert isinstance(message, Message)17    smtp = smtplib.SMTP(server)18    smtp.sendmail(send_from, send_to, message.as_string())19    smtp.close()20def make_mail(send_from, send_to, subject, text, files=[]):21    """Makes an plain text email message with attachments."""22    assert type(send_to)==list23    assert type(files)==list24    msg = MIMEMultipart()25    msg['From'] = send_from26    msg['To'] = COMMASPACE.join(send_to)27    msg['Date'] = formatdate(localtime=True)28    msg['Subject'] = subject29    msg.attach( MIMEText(text) )30    for f in files:31        part = MIMEBase('application', "octet-stream")32        part.set_payload( open(f,"rb").read() )33        Encoders.encode_base64(part)34        part.add_header('Content', 'attachment; filename="%s"' % os.path.basename(f))35        msg.attach(part)36    return msg37def send_mail(send_from, send_to, subject, text, files=[], server="server goes here"):38    """Sends a plain text email with attachements."""39    mail = make_mail(send_from, send_to, subject, text, files)40    send(send_from, send_to, mail, server)41def make_mail_html(send_from,send_to,subject,text,html):42    """Makes an HTML email message."""43    assert type(send_to)==list44    msg = MIMEMultipart('alternative')45    msg['From'] = send_from46    msg['To'] = COMMASPACE.join(send_to)47    msg['Date'] = formatdate(localtime=True)48    msg['Subject'] = subject49    msg.attach( MIMEText(text) )50    text_part = MIMEText(text,'plain')51    html_part = MIMEText(html,'html')52    msg.attach(text_part)53    msg.attach(html_part)54    return msg55def send_mail_html(send_from, send_to, subject, text, html, server="server goes here"):56    """Sends an HTML email message."""57    mail = make_mail_html(send_from, send_to, subject, text, html)58    send(send_from, send_to, mail, server)59def make_mail_files(send_from, send_to, subject, text, files=[]):60    """Makes an email message with attachments."""61    assert type(send_to)==list62    assert type(files)==list63    msg = MIMEMultipart()64    msg['From'] = send_from65    msg['To'] = COMMASPACE.join(send_to)66    msg['Date'] = formatdate(localtime=True)67    msg['Subject'] = subject68    msg.attach( MIMEText(text) )69    for f in files:70        part = MIMEBase('application', "octet-stream")71        part.set_payload( open(f,"rb").read() )72        Encoders.encode_base64(part)73        part.add_header('Content', 'attachment; filename="%s"' % os.path.basename(f))74        msg.attach(part)75    return msg76def send_mail_files(send_from, send_to, subject, text, files=[], server="server goes here"):77    """Sends an email with attachments."""78    mail = make_mail_files(send_from, send_to, subject, text, files)79    send(send_from, send_to, mail, server)80def make_mail_images(send_from,send_to,subject,text,images=[],cols=1,image_map=None,image_map_res=None) :81    """Makes an HTML email message with embedded images."""82    assert type(send_to)==list83    assert type(images)==list84    msg = MIMEMultipart('related')85    msg['From'] = send_from86    msg['To'] = COMMASPACE.join(send_to)87    msg['Date'] = formatdate(localtime=True)88    msg['Subject'] = subject89    msgAlternative = MIMEMultipart('alternative')90    msg.attach(msgAlternative)91    msgText = MIMEText('Please view as HTML')92    msgAlternative.attach(msgText)93    if image_map is not None :94        rows = len(image_map)95        if image_map_res is not None :96            if rows != len(image_map_res) :97                print 'Must provide image_map and image_map_res with matching dimensions'98                return False99            for i in range(rows) :100                if len(image_map[i]) != len(image_map_res[i]) :101                    print 'Must provide image_map and image_map_res with matching dimensions'102                    return False103        res = ''104        html_body = '%s <br>'%text105        for i in range(len(image_map)) :106            for j in range(len(image_map[i])) :107                if image_map_res is not None :108                    res = ' width="%s" height="%s"'%image_map_res[i][j]109                html_body += '<img src="cid:image' + str(i) + '_' + str(j) + '"' + res + '>'110                img = open(image_map[i][j],'rb')111                msgImage = MIMEImage(img.read())112                img.close()113                msgImage.add_header('Content-ID', '<image' + str(i) + '_' + str(j) + '>')114                msg.attach(msgImage)115            html_body += '<br>'116    else :117        html_body = '%s <br>'%text118        for i in range(len(images)) :119            html_body += '<img src="cid:image' + str(i) + '">'120            img = open(images[i],'rb')121            msgImage = MIMEImage(img.read())122            img.close()123            msgImage.add_header('Content-ID', '<image' + str(i) + '>')124            msg.attach(msgImage)125            if i % cols == (cols-1) :126                html_body += '<br>'127    msgText = MIMEText(html_body,'html')128    msgAlternative.attach(msgText)129    return msg130def send_mail_images(send_from,send_to,subject,text,images=[],cols=1,image_map=None,image_map_res=None, server='server goes here'):131    """Sends an HTML email with embedded images."""132    mail = make_mail_images(send_from,send_to,subject,text,image_map=image_map,image_map_res=image_map_res)133    send(send_from, send_to, mail, server)...send_email_tasks.py
Source:send_email_tasks.py  
1from celery import shared_task2from django.conf import settings3from mail_templated import EmailMessage4@shared_task5def rejected_certificate_email(certificate_title, issuer_name, send_to):6    from_email = settings.DEFAULT_FROM_EMAIL7    message = EmailMessage(8        'mail/certificate_rejected.tpl',9        {10            'issuer_name': issuer_name,11            'certificate_title': certificate_title,12            'unsubscribe_link': 'https://dapp.os.university/#/settings',13        },14        from_email,15        to=[send_to])16    message.send()17@shared_task18def verified_certificate_email(certificate_title, issuer_name, send_to):19    from_email = settings.DEFAULT_FROM_EMAIL20    message = EmailMessage(21        'mail/certificate_verified.tpl',22        {23            'issuer_name': issuer_name,24            'certificate_title': certificate_title,25            'unsubscribe_link': 'https://dapp.os.university/#/settings',26        },27        from_email,28        to=[send_to])29    message.send()30@shared_task31def certificate_upload_email(certificate_title, issuer_name, send_to):32    from_email = settings.DEFAULT_FROM_EMAIL33    message = EmailMessage(34        'mail/certificate_uploaded.tpl',35        {36            'issuer_name': issuer_name,37            'certificate_title': certificate_title,38            'unsubscribe_link': 'https://dapp.os.university/#/settings',39        },40        from_email,41        to=[send_to])42    message.send()43@shared_task44def approved_job_application_email(job_title, issuer_name, send_to):45    from_email = settings.DEFAULT_FROM_EMAIL46    message = EmailMessage(47        'mail/job_application_approved.tpl',48        {49            'issuer_name': issuer_name,50            'job_title': job_title,51            'unsubscribe_link': 'https://dapp.os.university/#/settings',52        },53        from_email,54        to=[send_to])55    message.send()56@shared_task57def inviting_email(connection, unsubscribe_link, owner_name):58    from_email = settings.DEFAULT_FROM_EMAIL59    message = EmailMessage(60        'mail/linkedin_connection.tpl',61        {62            'full_name': connection['full_name'],63            'unsubscribe_link': unsubscribe_link,64            'owner_name': owner_name,65        },66        from_email,67        to=[connection['email']])68    message.send()69@shared_task70def verification_email_on_create(verification_link, send_to):71    from_email = settings.DEFAULT_FROM_EMAIL72    message = EmailMessage(73        'mail/account_created.tpl',74        {75            'verification_link': verification_link,76            'unsubscribe_link': 'https://dapp.os.university/#/settings',77        },78        from_email,79        to=[send_to])80    message.send()81@shared_task82def verification_email(verification_link, send_to):83    from_email = settings.DEFAULT_FROM_EMAIL84    message = EmailMessage(85        'mail/email_verification.tpl',86        {87            'verification_link': verification_link,88            'unsubscribe_link': 'https://dapp.os.university/#/settings',89        },90        from_email,91        to=[send_to])92    message.send()93@shared_task94def not_created_email(unsubscribe_link, send_to):95    from_email = settings.DEFAULT_FROM_EMAIL96    message = EmailMessage(97        'mail/account_not_created.tpl',98        {99            'unsubscribe_link': unsubscribe_link,100        },101        from_email,102        to=[send_to])103    message.send()104@shared_task105def profile_created_email(profile_type, send_to):106    from_email = settings.DEFAULT_FROM_EMAIL107    message = EmailMessage(108        'mail/profile_created.tpl',109        {110            'profile_type': profile_type,111            'unsubscribe_link': 'https://dapp.os.university/#/settings',112        },113        from_email,114        to=[send_to])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
