Best Python code snippet using yandex-tank
astm_bidirectional_xl_1000.py
Source:astm_bidirectional_xl_1000.py  
1#!/usr/bin/python32import bidirectional_general as astmg3import astm_bidirectional_conf as conf4import fcntl, signal, logging5from astm_bidirectional_common import file_mgmt6class astms(astmg.astmg, file_mgmt):7  def __init__(self):8    self.main_status=09          #0=neutral10          #1=receiving11          #2=sending12    self.send_status=013          #1=enq sent14          #2=1st ack received15          #3=data sent16          #4=2nd ack received17          #0=eot sent18    self.set_inbox(conf.inbox_data,conf.inbox_arch)19    self.set_outbox(conf.outbox_data,conf.outbox_arch)20    super().__init__()21    22    self.alarm_time=conf.alarm_time23    signal.signal(signal.SIGALRM, self.signal_handler)24  ###################################25  #override this function in subclass26  ###################################27  #read from enq to eot in single file28  #it will have stx->lf segments, with segment number 1...7..0..729  def manage_read(self,data):30    #EOF is handled in base class31    #for receiving data32    if(data==b'\x05'):33      signal.alarm(0)      34      self.main_status=135      self.write_msg=b'\x06'36      self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable37      self.error_set=self.read_set.union(self.write_set)    #update error set38      #new file need not be class global (unlike existing file -> which needs to be deleted)39      new_file=self.get_inbox_filename()40      self.fd=open(new_file,'wb')41      fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)   #lock file42      #Now use this fd for stx-etb/etx frame writing43      self.fd.write(data)44      print_to_log('File Content written:',data)45      46      signal.alarm(self.alarm_time)47    elif(data[-1:] == b'\x0a'):48      signal.alarm(0)49      50      if(self.calculate_and_compare_checksum(data)==True):51        print_to_log('checksum matched:','Proceeding to write data')52        self.fd.write(data)53        print_to_log('File Content written:',data)54                     55        self.write_msg=b'\x06'56        self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable57        self.error_set=self.read_set.union(self.write_set)    #update error set      58      else:59        print_to_log('checksum mismatched:','Proceeding to send NAK')60        self.write_msg=b'\x15'61        self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable62        self.error_set=self.read_set.union(self.write_set)    #update error set      63        64      signal.alarm(self.alarm_time)65  66    elif(data==b'\x04'):67      signal.alarm(0)68      self.fd.write(data)69      print_to_log('File Content written:',data)70      self.fd.close()71      self.main_status=072      self.send_status=073    elif(data==b'\x06'):            #ACK when sending74      if(self.send_status==1):75        signal.alarm(0)76        self.send_status=277        print_to_log('send_status=={}'.format(self.send_status),'post-ENQ ACK')78        #writing79        80        self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable81        self.error_set=self.read_set.union(self.write_set)    #update error set82        83        self.get_first_outbox_file()                          #set current_outbox file84        fd=open(self.outbox_data+self.current_outbox_file,'rb')85        86        #data must not be >102487        #it will be ETX data , not ETB data88        #Frame number will always be one and only one89        byte_data=fd.read(2024)                               90        91        print_to_log('File Content',byte_data)92        chksum=self.get_checksum(byte_data)93        print_to_log('CHKSUM',chksum)94        self.write_msg=byte_data #set message95        96        #self.send_status=3       97        #data sent -> change status only when really data of stx-lf or anyother-inappropriate frame really sent98        99        #print_to_log('send_status=={}'.format(self.send_status),'changed send_status to 3 (data sent to write buffer)')100        #writing end101        signal.alarm(self.alarm_time) #wait for receipt of second ack102        103      elif(self.send_status==3):104        signal.alarm(0)105        self.send_status=4106        print_to_log('send_status=={}'.format(self.send_status),'post-LF ACK')107        #write108        self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable109        self.error_set=self.read_set.union(self.write_set)    #update error set110        self.write_msg=b'\x04'                                #set message EOT111        self.archive_outbox_file()                            112        #self.send_status=0                                    #change only where actually sent113        #self.main_status=0                                   #change only where actually sent114        #print_to_log('send_status=={}'.format(self.send_status),'sent EOT')115        #print_to_log('main_status=={}'.format(self.main_status),'connection is now, neutral')116        #write end117        #alarm not required, no expectation signal.alarm(self.alarm_time) 118        signal.alarm(self.alarm_time) #when EOT is really sent, change status, or if nothing is sent change status using alarm119    elif(data==b'\x15'):            #NAK120      signal.alarm(0)121      self.send_status=4122      print_to_log('send_status=={}'.format(self.send_status),'post-ENQ/LF NAK. Some error')123      #write - same as above124      self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable125      self.error_set=self.read_set.union(self.write_set)    #update error set126      self.write_msg=b'\x04'                                #set message EOT127      self.archive_outbox_file()128      #self.send_status=0                                    #only when actually sent129      #self.main_status=0130      print_to_log('send_status=={}'.format(self.send_status),'initiate_write() sent EOT')131      print_to_log('main_status=={}'.format(self.main_status),'initiate_write() now, neutral')132      #write end        133      #alarm not required, no expectation signal.alarm(self.alarm_time) 134      signal.alarm(self.alarm_time) #when EOT is really sent then change status , or if nothing is sent change status135     136  ###################################137  #override this function in subclass138  ###################################139  #This handles only STX-ETX data. NO STX-ETB management is done140  def initiate_write(self):141    print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'Entering initiate_write()') 142    if(self.main_status==0):143      print_to_log('main_status=={}'.format(self.main_status),'initiate_write() will find some pending work') 144      if(self.get_first_outbox_file()==True):                 #There is something to work      145        signal.alarm(0) 146        self.main_status=2                                    #announce that we are busy sending data147        print_to_log('main_status=={}'.format(self.main_status),'initiate_write() changed main_status to 2 to send data')148        self.write_set.add(self.conn[0])                      #Add in write set, for next select() to make it writable149        self.error_set=self.read_set.union(self.write_set)    #update error set150        self.write_msg=b'\x05'                                #set message ENQ151        #self.send_status=1                                    #status to ENQ sent only when written152        print_to_log('send_status=={}'.format(self.send_status),'initiate_write() sent ENQ to write buffer')153        signal.alarm(self.alarm_time) #wait for receipt of 1st ACK154      else:155        print_to_log('main_status=={}'.format(self.main_status),'no data in outbox. sleeping for a while')156        return157    else:158      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'busy somewhre.. initiate_write() will not initiate anything') 159  ###################################160  #override this function in subclass161  ###################################162  def manage_write(self):      163    #common code164    #Send message in response to write_set->select->writable initiated by manage_read() and initiate_write()165    print_to_log('Following will be sent',self.write_msg)166    try:167      self.conn[0].send(self.write_msg)168      #self.write_msg=''169    except Exception as my_ex :170      print_to_log("Disconnection from client?",my_ex)                    171    self.write_set.remove(self.conn[0])                   #now no message pending, so remove it from write set172    self.error_set=self.read_set.union(self.write_set)    #update error set173   174    #specific code for ASTM status update175    #if sending: ENQ, ...LF, EOT is sent176    #ff receiving: ACK, NAK sent (ACK seding donot need to change status, it activates only alarm177    if(self.write_msg==b'\x04'):      #if EOT sent178      self.main_status=0179      self.send_status=0180      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because EOT is sent') 181      signal.alarm(0)182      print_to_log('Neutral State','.. so stopping alarm') 183    elif(self.write_msg[-1:]==b'\x0a'): #if main message sent184      self.send_status=3185      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because message is sent(LF)') 186    elif(self.write_msg==b'\x05'):      #if enq sent187      self.send_status=1188      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because ENQ is sent') 189    elif(self.write_msg==b'\x06'):      #if ack sent190      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. no change in status ACK is sent') 191    elif(self.write_msg==b'\x15'):      #if NAK sent = EOT sent192      self.main_status=0193      self.send_status=0194      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because NAK is sent. going neutral') 195      signal.alarm(0)196      print_to_log('Neutral State','.. so stopping alarm') 197    else:                               #if data stream is incomplate/inappropriate containing EOT etc198      self.send_status=3199      print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. incomplate message (without LF) sent. EOT will be sent in next round') 200  #######Specific funtions for ASTM        201  def get_checksum(self,data):202    checksum=0203    start_chk_counting=False204    for x in data:205      if(x==2):206        start_chk_counting=True207        #Exclude from chksum calculation208        continue209      if(start_chk_counting==True):210        checksum=(checksum+x)%256211      if(x==3):212        start_chk_counting=False213        #Include in chksum calculation214      if(x==23):215        start_chk_counting=False216        #Include in chksum calculation217 218    two_digit_checksum_string='{:X}'.format(checksum).zfill(2)219    return two_digit_checksum_string.encode()220  def compare_checksum(self, received_checksum, calculated_checksum):221    if(received_checksum==calculated_checksum):222      return True223    else:224      return False225  def calculate_and_compare_checksum(self, data):226    calculated_checksum=self.get_checksum(data)227    received_checksum=data[-4:-2]228    print_to_log(229                      'Calculated checsum={}'.format(calculated_checksum),230                      'Received checsum={}'.format(received_checksum)231                      )232    return self.compare_checksum(received_checksum,calculated_checksum)233  def signal_handler(self,signal, frame):234    print_to_log('Alarm Stopped','Signal:{} Frame:{}'.format(signal,frame))235    print_to_log('change statuses ',' and close file')236    print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),' -->previous ') 237    self.send_status=0                                    #data sent238    self.main_status=0    239    print_to_log('current main_status={} send_status={}'.format(self.main_status,self.send_status),' -->current ') 240    241    try:242      if self.fd!=None:        243        self.fd.close()244        print_to_log('self.signal_handler()','file closed')245    except Exception as my_ex:246      print_to_log('self.signal_handler()','error in closing file:{}'.format(my_ex))    247    248    print_to_log('Alarm..response NOT received in stipulated time','data receving/sending may be incomplate')249def print_to_log(object1,object2):250  logging.debug('{} {}'.format(object1,object2))251  252#Main Code###############################253#use this to device your own script254if __name__=='__main__':255  logging.basicConfig(filename=conf.astm_log_filename,level=logging.DEBUG,format='%(asctime)s : %(message)s')  256 257  #print('__name__ is ',__name__,',so running code')258  while True:259    m=astms()260    m.astmg_loop()261    #break; #useful during debugging  ...mboxer.py
Source:mboxer.py  
...12        self.name=stat_name13    def set(self,new_num,new_name):14        self.num=new_num15        self.name=new_name16    def send_status(self,f):17        f.write(str(self.num) +' '+ self.name +'\n')18        f.flush()19class Header:20    def __init__(self,list_of_elem):21        self.id=list_of_elem[0]22        self.value=list_of_elem[1].strip('\n')23    def send_header(self,f):24        f.write(self.id +' '+ self.value +'\n')25        f.flush()26def set_header(raw_string): #sluzi na pripravu suroveho textu pre konstruktor triedy Header27    elem=raw_string.split(':')28    ret_list=[]29    if len(elem)==2:30        if (elem[0].isascii()):31            if (' ' not in elem[0] and '/' not in elem[0] and '/' not in elem[1]):32                ret_list=elem33    return ret_list     34            35s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)36s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)37s.bind(('',9999))38signal.signal(signal.SIGCHLD,signal.SIG_IGN)39s.listen(5)40commands=['READ\n','WRITE\n','LS\n']41while True:42    connected_socket,address=s.accept()43    pid_chld=os.fork()44    current_status=Status(200,'Bad request')45    if pid_chld == 0:46        while True:47            s.close()48            header1=[]49            header2=[]50            data=''51            current_status.set(200,'Bad request')52            f=connected_socket.makefile(mode='rw')53            data=f.readline()54            if data in commands:55                if (commands.index(data)==0): #READ56                    while True:57                        data=f.readline()58                        header1=set_header(data)59                        if (len(header1)==2 and header1[0] == 'Mailbox'):   #ak su tie dve casti hlavicky v poriadku,60                            mailbox_head=Header(header1)            #robim z nich prvok triedy Header61                            data=f.readline()62                            header2=set_header(data)63                            if (header2[0]=='Message'):64                                message_head=Header(header2)65                                if not path.exists(mailbox_head.value):66                                    current_status.set(201,'No such message')67                                    current_status.send_status(f)68                                    f.write('\n')69                                    f.flush()70                                    break71                                with open(mailbox_head.value+'/'+message_head.value) as my_file:72                                    content=my_file.read()73                                current_status.set(100,'OK')74                                current_status.send_status(f)   75                                f.write('Content-length:'+str(len(content))+'\n')76                                f.flush()77                                f.write('\n'+content)78                                f.flush()79                                break80                            else:81                                current_status.send_status(f)82                                break           83                        else:84                            current_status.send_status(f)85                            break86                elif (commands.index(data)==1): #WRITE87                    while True:88                        data=f.readline()89                        header1=set_header(data)90                        if (len(header1)==2 and header1[0] == 'Mailbox'):91                            mailbox_head=Header(header1)92                            data=f.readline()93                            header2=set_header(data)94                            if (header2[0]=='Content-length' and int(header2[1])>-1):95                                cont_len=Header(header2)96                                if not path.exists(mailbox_head.value):97                                    current_status.set(203,'No such mailbox')98                                    current_status.send_status(f)99                                    f.write('\n')100                                    f.flush()101                                    break102                                else:103                                    m=hashlib.md5()104                                    current_status.set(100,'OK')105                                    message=f.readline(int(cont_len.value))106                                    hex_path=m.hexdigest()107                                    f.flush()108                                    with open(mailbox_head.value+'/'+hex_path,"w") as my_file:109                                        my_file.write(message)110                                    current_status.send_status(f)111                                    f.write('\n')112                                    f.flush()113                                    break114                            else:115                                current_status.send_status(f)116                                f.write('\n')117                                f.flush()118                                break119                        else:120                            current_status.send_status(f)121                            f.write('\n')122                            f.flush()123                            break124                elif (commands.index(data)==2): #LS125                    while True:126                        data=f.readline()127                        header1=set_header(data)128                        if (len(header1)==2 and header1[0] == 'Mailbox'):129                            mailbox_head=Header(header1)130                            if not path.exists(mailbox_head.value):131                                current_status.set(201,'No such mailbox')132                                break133                            files_list=[]134                            files_list=os.listdir(mailbox_head.value)135                            current_status.set(100,'OK')136                            current_status.send_status(f)       137                            f.write('Number-of-messages:'+str(len(files_list))+'\n\n')138                            f.flush()139                            for item in files_list:140                                f.write(item+'\n')141                                f.flush()142                            break143                        else:144                            current_status.send_status(f)145                            f.write('\n')146                            f.flush()147                            break148            else:149                current_status.set(204,'Unknown method')150                current_status.send_status(f)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
