Best Python code snippet using stestr_python
controller_outdated.py
Source:controller_outdated.py  
1#import threading2import time3import numpy as np 4from Queue import Queue5from Queue import Empty6from Queue import Full7from audio_receiver import Audio_Receiver8from id_receiver import Id_Receiver9from speaker import Speaker10from visualizer import Visualizer11    12class Controller:13    14    def __init__(self, visualization=True):15        self.id_to_main_queue = Queue(maxsize=50)  16        self.audio_to_main_queue = Queue(maxsize=1000)17        self.id_recv = Id_Receiver(self.id_to_main_queue)18        self.audio_recv = Audio_Receiver(self.audio_to_main_queue)19        20        self.visualization = visualization21        if self.visualization:22            self.main_to_vis_queue = Queue(maxsize=50)23            self.visualizer = Visualizer(self.main_to_vis_queue)24              25        self.speakers = []26        self.num_speakers = 027        28    def run(self):29        self.id_recv.start()30        self.audio_recv.start()31        32        if self.visualization:33            self.visualizer.start()34        35        recording_id_odas = [0, 0, 0, 0]36        recording_id_us = [0, 0, 0, 0]37        last_recording_id_us = [0, 0, 0, 0]38        39        40        while self.id_recv.is_alive() and self.audio_recv.is_alive():41            42            #wait for new audio data43            latest_audio = self.audio_to_main_queue.get(block=True)44            45            audio_recording_buffer = [np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16)]46            47            #get the latest id update (empties queue and save the last one)48            while 1:49                try:50                    latest_id = self.id_to_main_queue.get(block=False)  51                except Empty:52                    break53            54            if self.visualization:55                current_speakers = []56            57            58            #this part filters the odas source tracking and assigs our filtered speaker id to each odas source59            for i in range(len(latest_id['src'])):  #len=460            #{ "id": 53, "tag": "dynamic", "x": -0.828, "y": -0.196, "z": 0.525, "activity": 0.926 }   61                recording_id_odas[i] = latest_id['src'][i]['id']62                recording_id_us[i] = 0 #clear our ids63                if recording_id_odas[i] > 0:64                    thispos = [latest_id['src'][i]['x'], latest_id['src'][i]['y'], latest_id['src'][i]['z']]65                    #check if we know this speaker id, and if yes update our speaker66                    found_matching_speaker = False67                    for speaker in self.speakers:68                        if speaker.current_odas_id == recording_id_odas[i]:69                            recording_id_us[i] = speaker.id70                            speaker.pos = thispos71                            found_matching_speaker = True72                            break73                        74                 #TODO: if to speakers drifted close together and became inactive, how to decide, which one it is now75                    if not found_matching_speaker:76                        #check if it mathces a certain angle threshold to one of our speakers77                        closest_dist = 1078                        for speaker in self.speakers:79                            dist = speaker.get_dist_to(thispos)80                            if dist < closest_dist:81                                closest_dist = dist82                                closest_id = speaker.id83                                84                            if dist < 0.54:  # 0.54 is roughly 10 degrees85                                speaker.current_odas_id = recording_id_odas[i]86                                recording_id_us[i] = speaker.id87                                speaker.pos = thispos88                                found_matching_speaker = True89                                print("is matching %d" % (speaker.id))90                                break91                            92                    if not found_matching_speaker:93                        #this speaker does not exist, create a new one94                        self.num_speakers += 195                        print("Found new speaker: %d, pos: %f, %f, %f" % (self.num_speakers, thispos[0], thispos[1], thispos[2]))96                        if closest_dist > 1.3: #1.3 is roughtly 30 dgree97                            self.speakers.append(Speaker(self.num_speakers, thispos, recording_id_odas[i]))98                            recording_id_us[i] = self.num_speakers99                        else:100                            self.speakers.append(Speaker(self.num_speakers, thispos, recording_id_odas[i], closest_dist, closest_id))101                            recording_id_us[i] = self.num_speakers102                    103                    if self.visualization:104                        #at this point we have assigned our speaker id, so save this information for the visualization thread105                        current_speakers.append([recording_id_us[i], latest_id['src'][i]['id'], latest_id['src'][i]['x'], latest_id['src'][i]['y'], latest_id['src'][i]['z'], latest_id['src'][i]['activity'] ])106                        107                        108            #put data in the que for the visualizer109            if self.visualization:110                try:111                    112                    self.main_to_vis_queue.put({'current_speakers': current_speakers, 'known_speakers': self.speakers, 'num_known_speakers': self.num_speakers}, block=False)113                except Full:114                    #print("couldn't put data into visualization queue, its full")115                    pass116                117                118                119                120            #record audio of currently active speakers121            for i in range(len(recording_id_us)):122                if recording_id_us[i] > 0:123                    audio_recording_buffer[i] = np.append(audio_recording_buffer[i], latest_audio[i])124                125                #if a speaker stopped speaking send the chunk off126                if recording_id_us[i] == 0 and last_recording_id_us[i] > 0:127                    128                    #TODO send this somewhere129                    #clear this recording buffer130                    audio_recording_buffer[i] = np.empty(0, dtype=np.int16)131                    132                #check if one channel has been active for too long, then cut it and send it133                if audio_recording_buffer[i].shape[0] > 16000 * 60: #cut after 60 seconds134                    135                    #TODO send this somewhere136                    #clear this recording buffer137                    audio_recording_buffer[i] = np.empty(0, dtype=np.int16)138                    139                last_recording_id_us[i] = recording_id_us[i]140                    141                    142                    143                144                    145            146            147            148            149        print("done.")150        self.id_recv.stop()151        self.audio_recv.stop()152        if self.visualization:153            self.visualizer.stop()154        155    156    ...outlook.py
Source:outlook.py  
1import email2import imaplib3import smtplib4import datetime5import email.mime.multipart6import config7import base648class Outlook():9    def __init__(self):10        pass11        # self.imap = imaplib.IMAP4_SSL('imap-mail.outlook.com')12        # self.smtp = smtplib.SMTP('smtp-mail.outlook.com')13    def login(self, username, password):14        self.username = username15        self.password = password16        login_attempts = 017        while True:18            try:19                self.imap = imaplib.IMAP4_SSL(config.imap_server,config.imap_port)20                r, d = self.imap.login(username, password)21                assert r == 'OK', 'login failed: %s' % str (r)22                print(" > Signed in as %s" % self.username, d)23                return24            except Exception as err:25                print(" > Sign in error: %s" % str(err))26                login_attempts = login_attempts + 127                if login_attempts < 3:28                    continue29                assert False, 'login failed'30    def sendEmailMIME(self, recipient, subject, message):31        msg = email.mime.multipart.MIMEMultipart()32        msg['to'] = recipient33        msg['from'] = self.username34        msg['subject'] = subject35        msg.add_header('reply-to', self.username)36        # headers = "\r\n".join(["from: " + "sms@kitaklik.com","subject: " + subject,"to: " + recipient,"mime-version: 1.0","content-type: text/html"])37        # content = headers + "\r\n\r\n" + message38        try:39            self.smtp = smtplib.SMTP(config.smtp_server, config.smtp_port)40            self.smtp.ehlo()41            self.smtp.starttls()42            self.smtp.login(self.username, self.password)43            self.smtp.sendmail(msg['from'], [msg['to']], msg.as_string())44            print("   email replied")45        except smtplib.SMTPException:46            print("Error: unable to send email")47    def sendEmail(self, recipient, subject, message):48        headers = "\r\n".join([49            "from: " + self.username,50            "subject: " + subject,51            "to: " + recipient,52            "mime-version: 1.0",53            "content-type: text/html"54        ])55        content = headers + "\r\n\r\n" + message56        attempts = 057        while True:58            try:59                self.smtp = smtplib.SMTP(config.smtp_server, config.smtp_port)60                self.smtp.ehlo()61                self.smtp.starttls()62                self.smtp.login(self.username, self.password)63                self.smtp.sendmail(self.username, recipient, content)64                print("   email sent.")65                return66            except Exception as err:67                print("   Sending email failed: %s" % str(err))68                attempts = attempts + 169                if attempts < 3:70                    continue71                raise Exception("Send failed. Check the recipient email address")72    def list(self):73        # self.login()74        return self.imap.list()75    def select(self, str):76        return self.imap.select(str)77    def inbox(self):78        return self.imap.select("Inbox")79    def junk(self):80        return self.imap.select("Junk")81    def logout(self):82        return self.imap.logout()83    def since_date(self, days):84        mydate = datetime.datetime.now() - datetime.timedelta(days=days)85        return mydate.strftime("%d-%b-%Y")86    def allIdsSince(self, days):87        r, d = self.imap.search(None, '(SINCE "'+self.since_date(days)+'")', 'ALL')88        list = d[0].split(' ')89        return list90    def allIdsToday(self):91        return self.allIdsSince(1)92    def readIdsSince(self, days):93        r, d = self.imap.search(None, '(SINCE "'+self.date_since(days)+'")', 'SEEN')94        list = d[0].split(' ')95        return list96    def readIdsToday(self):97        return self.readIdsSince(1)98    def unreadIdsSince(self, days):99        r, d = self.imap.search(None, '(SINCE "'+self.since_date(days)+'")', 'UNSEEN')100        list = d[0].split(' ')101        return list102    def unreadIdsToday(self):103        return self.unreadIdsSince(1)104    def allIds(self):105        r, d = self.imap.search(None, "ALL")106        list = d[0].split(' ')107        return list108    def readIds(self):109        r, d = self.imap.search(None, "SEEN")110        list = d[0].split(' ')111        return list112    def unreadIds(self):113        r, d = self.imap.search(None, "UNSEEN")114        list = d[0].split(' ')115        return list116    def hasUnread(self):117        list = self.unreadIds()118        return list != ['']119    def getIdswithWord(self, ids, word):120        stack = []121        for id in ids:122            self.getEmail(id)123            if word in self.mailbody().lower():124                stack.append(id)125        return stack126    def getEmail(self, id):127        r, d = self.imap.fetch(id, "(RFC822)")128        self.raw_email = d[0][1]129        self.email_message = email.message_from_string(self.raw_email)130        return self.email_message131    def unread(self):132        list = self.unreadIds()133        latest_id = list[-1]134        return self.getEmail(latest_id)135    def read(self):136        list = self.readIds()137        latest_id = list[-1]138        return self.getEmail(latest_id)139    def readToday(self):140        list = self.readIdsToday()141        latest_id = list[-1]142        return self.getEmail(latest_id)143    def unreadToday(self):144        list = self.unreadIdsToday()145        latest_id = list[-1]146        return self.getEmail(latest_id)147    def readOnly(self, folder):148        return self.imap.select(folder, readonly=True)149    def writeEnable(self, folder):150        return self.imap.select(folder, readonly=False)151    def rawRead(self):152        list = self.readIds()153        latest_id = list[-1]154        r, d = self.imap.fetch(latest_id, "(RFC822)")155        self.raw_email = d[0][1]156        return self.raw_email157    def mailbody(self):158        if self.email_message.is_multipart():159            for payload in self.email_message.get_payload():160                # if payload.is_multipart(): ...161                body = (162                    payload.get_payload()163                    .split(self.email_message['from'])[0]164                    .split('\r\n\r\n2015')[0]165                )166                return body167        else:168            body = (169                self.email_message.get_payload()170                .split(self.email_message['from'])[0]171                .split('\r\n\r\n2015')[0]172            )173            return body174    def mailsubject(self):175        return self.email_message['Subject']176    def mailfrom(self):177        return self.email_message['from']178    def mailto(self):179        return self.email_message['to']180    def maildate(self):181        return self.email_message['date']182    def mailreturnpath(self):183        return self.email_message['Return-Path']184    def mailreplyto(self):185        return self.email_message['Reply-To']186    def mailall(self):187        return self.email_message188    def mailbodydecoded(self):...merger.py
Source:merger.py  
1import threading2from Queue import Queue3from Queue import Empty4from audio_receiver import Audio_Receiver5from id_receiver import Id_Receiver6# Merges to two continuous asynchronous streams of id and audio together, to a chunked stream of tagged audio data7class Merger(threading.Thread):8    def __init__(self, outq):9        threading.Thread.__init__(self)10        self.please_stop = threading.Event()11        self.id_to_merger_queue = Queue(maxsize=50)12        self.audio_to_merger_queue = Queue(maxsize=1000)13        self.id_recv = Id_Receiver(self.id_to_merger_queue)14        self.audio_recv = Audio_Receiver(self.audio_to_merger_queue)15        self.outq = outq16    def run(self):17        self.id_recv.start()18        self.audio_recv.start()19        while self.id_recv.is_alive() and self.audio_recv.is_alive() and not self.please_stop.is_set():20            # wait for new audio data21            try:22                latest_audio = self.audio_to_merger_queue.get(block=True, timeout=1)23            except Empty:24                continue  # restart loop, but check again if we maybe got a stop signal25            # get the latest id update (empties queue and save the last one)26            while 1:27                try:28                    latest_id = self.id_to_merger_queue.get(block=False)29                except Empty:30                    break31            # get id data and put it into a list of 4 lists32            # each of the 4 will contain odas_id, x,y,z,activity33            id_info = []34            for i in range(len(latest_id['src'])):35                id_info.append([latest_id['src'][i]['id'], latest_id['src'][i]['x'], latest_id['src'][i]['y'],36                                latest_id['src'][i]['z'], latest_id['src'][i]['activity']])37            # merge in a dict and put into a queue38            nextout = {'audio_data': latest_audio, 'id_info': id_info}39            if self.outq.qsize() > self.outq.maxsize - 20:40                print("merger out queue has grown to large, discarding some, this should rarely happen!!!!!!")41                for i in range(20):  # q is almost full, discard some data42                    try:43                        self.outq.get(block=False)44                    except Empty:45                        pass46            self.outq.put(nextout)  # this is blocking and thread safe47        if self.please_stop.is_set():48            print("Merger is stopping as externally requested.")49        else:50            print("Merger is stopping as one of the receivers has stopped.")51        self.id_recv.stop()52        self.audio_recv.stop()53    def stop(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
