Best Python code snippet using lemoncheesecake
dns.py
Source:dns.py  
1# -*- encoding: utf-8 -*-2'''3  Functions to read from dns data files.4  MeasurementData Object from 'measurement_data_structure' is used to store the data points.5  read_data is the main procedure, returning a list of MeasurementData objects.6'''7# Pleas do not make any changes here unless you know what you are doing.8from math import sqrt9import numpy10from plot_script.measurement_data_structure import MeasurementData11from plot_script.config import dns as config12from plot_script.config.dns import * #@UnusedWildImport13__author__="Artur Glavic"14__credits__=[]15from plot_script.plotpy_info import __copyright__, __license__, __version__, __maintainer__, __email__ #@UnusedImport16__status__="Production"17def read_data(file_name, print_comments=True):18  '''19    Read the data of a dns data file.20    21    :return: MeasurementData object with the data from file_name22  '''23  if not os.path.exists(file_name): # Test if the file exists24    if print_comments:25      print "File does not exist."26    return 'NULL'27  if file_name.endswith('.gz'):28    # use gziped data format29    import gzip30    file_handler=gzip.open(file_name, 'r')31  else:32    file_handler=open(file_name, 'r')33  add_info={}34  # read header to test if this is a dns data file35  if (file_handler.readline().split()[0:3]==['#', 'DNS', 'Data']):36    file_handler.readline() # skip empty line37    add_info['header']=read_header(file_handler) # read header information38    if config.LAMBDA_NEUTRON:39      add_info['lambda_n']=config.LAMBDA_NEUTRON40      read_lambda(file_handler)41    else:42      add_info['lambda_n']=read_lambda(file_handler) # find wavelength43    # get the information defined in config.dns.GET_INFO function44    for info in GET_INFO:45      add_info[info[1]]=read_info(file_handler, info[0])46    while (file_handler.readline().find('DATA')==-1): # read until data line47      continue48    # dimensions are given at the line above the data49    line=file_handler.readline().split()50    detectors=min(float(line[1]), NUMBER_OF_DETECTORS)51    time_channels=float(line[2])52    # collect the data53    data_array=read_detector_data(file_handler, detectors, time_channels)54    file_handler.close()55    #measurement_data=evaluate_data(data_array,add_info['detector_bank_2T'])56    columns=[['Detector', '']]57    error_columns=[]58    for i in range(len(data_array[0][1])):59      columns.append(['Channel_%i'%i, 'counts/'+SCALE_BY[1]])60      error_columns.append(['Error_Ch_%i'%i, 'counts/'+SCALE_BY[1]])61    columns+=error_columns62    measurement_data=MeasurementData(columns, [], 0, 1, len(data_array[0][1])*2, zdata=-1)63    scaling=add_info[SCALE_BY[0]]64    scale=lambda intensity: intensity/scaling65    error_scale=lambda intensity: sqrt(intensity)/scaling66    for point in data_array:67      measurement_data.append([point[0]]+map(scale, point[1])+map(error_scale, point[1]))68    measurement_data.dns_info=add_info69    measurement_data.info="\n".join(map(lambda item: item[0]+': '+str(item[1]),70                                    sorted(add_info.items())))71    measurement_data.sample_name=file_name72    return measurement_data73  else: # not dns data74    if print_comments:75      print "Wrong file type! Doesn't contain dns header information."76    return 'NULL'77def read_header(file_handler):78  ''' 79    Read file header information.80    81    :return: Header string without # characters82  '''83  file_handler.readline()84  line=file_handler.readline()85  output=''86  while (not line[0:6]=='#-----'): # while header section is not over87    output=output+line.lstrip('#')88    line=file_handler.readline()89  return output90def read_lambda(file_handler):91  ''' 92    Read wavelength when after comment section.93    94    :return: Float of wavelength95  '''96  file_handler.readline()97  return abs(float(file_handler.readline().split()[4]))*1098def read_info(file_handler, info_name):99  ''' 100    Read until specified line.101    102    :param info_name: String of the parameter to search for103  '''104  line=file_handler.readline()105  while (line.find(info_name)==-1):106    line=file_handler.readline()107  return float(line.split()[2])108def read_detector_data(file_handler, detectors, time_channels):109  '''110    Reads data and stores it in an array.111    112    :param detectors: Number of detectors installed113    :param time_channels: number of channels to read from114    115    :return: List of data points116  '''117  data=[]118  data_point=[]119  i=-1120  j=0121  line=file_handler.readline()122  while(not line==''): # read until EOF123    for value in line.split():124      if (i==-1):125        j=j+1126      else:127        if (i<time_channels):128          data_point.append(float(value))129      i=i+1130    if(not i<time_channels):131      # only save data form detectors after start_with132      if (START_WITH_DETECTOR<=j):133        data.append([j-1, data_point])134      data_point=[]135      i=-1136      if (j>=detectors):137        break138    line=file_handler.readline()139  return data140def read_data_d7(file_name, print_comments=True):141  '''142    Read the data of a d7 data file.143    144    :return: List of MeasurementData objects with the data from file_name145  '''146  if not os.path.exists(file_name): # Test if the file exists147    if print_comments:148      print "File does not exist."149    return 'NULL'150  if file_name.endswith('.gz'):151    # use gziped data format152    import gzip153    file_handler=gzip.open(file_name, 'r')154  else:155    file_handler=open(file_name, 'r')156  # read header to test if this is a d7 data file157  if file_handler.readline().strip()=='RRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR':158    comments, head_info, data_string=file_handler.read().split(159                            'IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII')160    # Read additional information from the header161    add_info=evaluate_header_d7(comments, head_info)162    # extract counts from datastring163    counts=map(float, data_string.split())164    counts=[counts[i*132:(i+1)*132] for i in range(len(counts)/132)]165    #counts=map(lambda count: numpy.array(list(reversed(count))), counts)166    counts=map(lambda count: numpy.array((count[88:132]+count[44:88]+count[0:44])), counts)167    detector_angles=(add_info['detector_bank_2T']-numpy.array(D7_DETECTOR_MAP[0])).tolist()+\168                    (add_info['detector_bank_2T[1]']-numpy.array(D7_DETECTOR_MAP[1])).tolist()+\169                    (add_info['detector_bank_2T[2]']-numpy.array(D7_DETECTOR_MAP[2])).tolist()170    # create objects171    datasets=[]172    for i, counts_array in enumerate(counts):173      measurement_data=MeasurementData([ ['Detector', ''],174                                       ['Channel_0', 'counts/'+SCALE_BY[1]],175                                       ['Error_Ch_0', 'counts/'+SCALE_BY[1]],176                                       ['Detectorbank', '°'],177                                       ], [], 0, 1, 2, zdata=-1)178      error_array=numpy.sqrt(counts_array)179      counts_array/=add_info[SCALE_BY[0]+'[%i]'%i]180      error_array/=add_info[SCALE_BY[0]+'[%i]'%i]181      measurement_data.data[3].values=list(detector_angles)182      measurement_data.data[1].values=counts_array.tolist()183      measurement_data.data[2].values=error_array.tolist()184      measurement_data.data[0].values=range(132)185      measurement_data.dns_info=dict(add_info)186      measurement_data.dns_info[SCALE_BY[0]]=add_info[SCALE_BY[0]+'[%i]'%i]187      measurement_data.dns_info.update({188                                    'flipper': add_info['currents[%i]'%i][0],189                                    'flipper_compensation': add_info['currents[%i]'%i][1],190                                    'C_a': add_info['currents[%i]'%i][2],191                                    'C_b': add_info['currents[%i]'%i][3],192                                    'C_c': add_info['currents[%i]'%i][4],193                                    'C_z': add_info['currents[%i]'%i][5],194                                  })195      measurement_data.sample_name=file_name+'[%i]'%i196      measurement_data.info="\n".join(map(lambda item: item[0]+': '+str(item[1]),197                                    sorted(add_info.items())))198      datasets.append(measurement_data)199    return datasets200  else: # not dns data201    if print_comments:202      print "Wrong file type! Doesn't contain dns header information."203    return 'NULL'204def evaluate_header_d7(comments, head_info):205  '''206    Reads sample name and additional information as temperature and omega position 207    from d7 file header information.208    209    :return: Dictionary of the read information.210  '''211  add_info={}212  header_blocks=head_info.split('FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF')213  # extract the two data blocks214  header_blocks_data=[header_blocks[1].split('\n', 5)[5],215          header_blocks[2].split(216            'SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS')[0].split('\n', 5)[5]]217  header_blocks_data[0]=map(float, header_blocks_data[0].split())218  header_blocks_data[1]=map(float, header_blocks_data[1].split())219  # extract important information220  add_info['lambda_n']=header_blocks_data[0][0]221  add_info['detector_angles']=True222  add_info['detector_bank_2T']=header_blocks_data[0][164]223  add_info['detector_bank_2T[1]']=header_blocks_data[0][165]224  add_info['detector_bank_2T[2]']=header_blocks_data[0][166]225  #add_info['detector_bank_2T[3]']=header_blocks_data[0][167]226  add_info['omega']=-header_blocks_data[0][162]227  add_info['field']=header_blocks_data[1][6]228  add_info['temperature']=header_blocks_data[1][5]229  add_info['currents[0]']=header_blocks_data[1][8:14]230  add_info['currents[1]']=header_blocks_data[1][18:24]231  add_info['currents[2]']=header_blocks_data[1][28:34]232  add_info['currents[3]']=header_blocks_data[1][38:44]233  add_info['currents[4]']=header_blocks_data[1][48:54]234  add_info['currents[5]']=header_blocks_data[1][58:64]235  add_info['time[0]']=header_blocks_data[1][14]/100.236  add_info['time[1]']=header_blocks_data[1][24]/100.237  add_info['time[2]']=header_blocks_data[1][34]/100.238  add_info['time[3]']=header_blocks_data[1][44]/100.239  add_info['time[4]']=header_blocks_data[1][54]/100.240  add_info['time[5]']=header_blocks_data[1][64]/100.241  add_info['monitor[0]']=header_blocks_data[1][16]242  add_info['monitor[1]']=header_blocks_data[1][26]243  add_info['monitor[2]']=header_blocks_data[1][36]244  add_info['monitor[3]']=header_blocks_data[1][46]245  add_info['monitor[4]']=header_blocks_data[1][56]246  add_info['monitor[5]']=header_blocks_data[1][66]247  # evaluate comments248  true_comments=comments.split(249                 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA')[2].split('\n', 2)[2]250  add_info['full comment']=true_comments251  add_info['sample_name']=true_comments.splitlines()[5].strip()252  return add_info253def read_vana_d7(file_name, print_comments=True):254  '''255    Readan already corrected vanadium data file from D7.256    257    :return: List of MeasurementData objects with the data from file_name258  '''259  if not os.path.exists(file_name): # Test if the file exists260    if print_comments:261      print "File does not exist."262    return 'NULL'263  if file_name.endswith('.gz'):264    # use gziped data format265    import gzip266    file_handler=gzip.open(file_name, 'r')267  else:268    file_handler=open(file_name, 'r')269  # read header to test if this is a d7 data file270  text=file_handler.read()271  if text.startswith('Vanadium Intensities'):272    data=text.splitlines()[2:]273    data_items=map(str.split, data)274    def get_data(splitted_line):275      detector=int(splitted_line[0])-1276      intensity=float(splitted_line[2])277      dintensity=float(splitted_line[3])278      return [detector, intensity, dintensity]279    measurement_data=MeasurementData([ ['Detector', ''],280                                       ['Channel_0', 'counts/'+SCALE_BY[1]],281                                       ['Error_Ch_0', 'counts/'+SCALE_BY[1]],282                                       ], [], 0, 1, 2, zdata=-1)283    map(lambda splitted_line: measurement_data.append(get_data(splitted_line)), data_items[88:])284    map(lambda splitted_line: measurement_data.append(get_data(splitted_line)), data_items[44:88])285    map(lambda splitted_line: measurement_data.append(get_data(splitted_line)), data_items[:44])286    measurement_data.dns_info={287                    'detector_bank_2T': 0,288                    'flipper': 0,289                    'flipper_compensation': 99.,290                    'C_a': 0,291                    'C_b': 0,292                    'C_c': 0,293                    'C_z': 0,294                               }295    return measurement_data296  else:...env.cgi
Source:env.cgi  
...19    try:20        host_name = socket.gethostbyaddr(remote_addr)[0]21    except Exception as e:22        host_name = 'N/A'23    add_info('SYSTEM TIME      : ' + date_time)24    add_info('SERVER_NAME      : ' + os.environ.get('SERVER_NAME', ''))25    add_info('SERVER_ADDR      : ' + os.environ.get('SERVER_ADDR', ''))26    add_info('SERVER_PORT      : ' + os.environ.get('SERVER_PORT', ''))27    add_info('GATEWAY_INTERFACE: ' + os.environ.get('GATEWAY_INTERFACE', ''))28    add_info('SERVER_SOFTWARE  : ' + os.environ.get('SERVER_SOFTWARE', ''))29    add_info('----------------------------------------------------------------')30    add_info('HOST ADDRESS     : ' + remote_addr)31    add_info('HOST NAME        : ' + host_name)32    add_info('REQUEST_METHOD   : ' + os.environ.get('REQUEST_METHOD', ''))33    add_info('REQUEST_URI      : ' + os.environ.get('REQUEST_URI', ''))34    add_info('SERVER_PROTOCOL  : ' + os.environ.get('SERVER_PROTOCOL', ''))35    add_info('QUERY_STRING     : ' + os.environ.get('QUERY_STRING', 'N/A'))36    add_info('REMOTE_USER      : ' + os.environ.get('REMOTE_USER', 'N/A'))37    add_info('REMOTE_PORT      : ' + os.environ.get('REMOTE_PORT', ''))38    add_info('Content-Length   : ' + os.environ.get('CONTENT_LENGTH', 'N/A'))39    add_info('Content-Type     : ' + os.environ.get('CONTENT_TYPE', 'N/A'))40    add_info('User-Agent       : ' + os.environ.get('HTTP_USER_AGENT', ''))41    add_info('Accept           : ' + os.environ.get('HTTP_ACCEPT', ''))42    add_info('Accept-Language  : ' + os.environ.get('HTTP_ACCEPT_LANGUAGE', 'N/A'))43    add_info('Accept-Charset   : ' + os.environ.get('HTTP_ACCEPT_CHARSET', 'N/A'))44    add_info('Accept-Encoding  : ' + os.environ.get('HTTP_ACCEPT_ENCODING', 'N/A'))45    add_info('Cookie           : ' + os.environ.get('HTTP_COOKIE', 'N/A'))46    add_info('Referer          : ' + os.environ.get('HTTP_REFERER', 'N/A'))47    add_info('Connection       : ' + os.environ.get('HTTP_CONNECTION', 'N/A'))48    add_info('Proxy-Connection : ' + os.environ.get('HTTP_PROXY_CONNECTION', 'N/A'))49    add_info('Via              : ' + os.environ.get('HTTP_VIA', 'N/A'))50    add_info('X-Forwarded-For  : ' + os.environ.get('HTTP_X_FORWARDED_FOR', 'N/A'))51    add_info('X-Forwarded-Host : ' + os.environ.get('HTTP_X_FORWARDED_HOST', 'N/A'))52    add_info('X-Forwarded-Proto: ' + os.environ.get('HTTP_X_FORWARDED_PROTO', 'N/A'))53    add_info('Upgrade-Insecure-Requests: ' + os.environ.get('HTTP_UPGRADE_INSECURE_REQUESTS', 'N/A'))54    add_info('----------------------------------------------------------------')55    add_info('[stdin]')56    info += '' + sys.stdin.read()57def add_info(content):58    global info59    info += content + '\n'60def main():61    try:62        get_env_info()63        global info64        content = info65    except Exception as e:66        content = str(e)67    print('Content-Type: text/plain')68    print()69    print(content)...audio.py
Source:audio.py  
...5    MODULE = io.AudioRecorderModule6    PARAMETERS = {"filename": "recorded_audio.wav", "rate": 44100, "sample_width": 2}7    def set_content(self):8        self.gui.clear_content()9        self.gui.add_info("Filename: %s" % self.retico_module.filename)10        self.gui.add_info("Rate: %d" % self.retico_module.rate)11        self.gui.add_info("Sample Width: %d" % self.retico_module.sample_width)12class AudioDispatcherModule(AbstractModule):13    MODULE = io.AudioDispatcherModule14    PARAMETERS = {15        "target_chunk_size": 5000,16        "rate": 44100,17        "sample_width": 2,18        "speed": 1.0,19        "continuous": True,20        "silence": None,21        "interrupt": True,22    }23    def set_content(self):24        self.gui.clear_content()25        self.gui.add_info(26            "Target chunk size: %d" % self.retico_module.target_chunk_size27        )28        self.gui.add_info("Rate: %d" % self.retico_module.rate)29        self.gui.add_info("Sample Width: %d" % self.retico_module.sample_width)30        self.gui.add_info("Speed: %f" % self.retico_module.speed)31        self.gui.add_info("Continuous: %s" % self.retico_module.continuous)32        self.gui.add_info("Silence: %d" % len(self.retico_module.silence))33class SpeakerModule(AbstractModule):34    MODULE = io.SpeakerModule35    PARAMETERS = {"rate": 44100, "sample_width": 2, "use_speaker": "both"}36    def set_content(self):37        self.gui.clear_content()38        self.gui.add_info("Rate: %d" % self.retico_module.rate)39        self.gui.add_info("Sample Width: %d" % self.retico_module.sample_width)40        self.gui.add_info("Using Speaker: %s" % self.retico_module.use_speaker)41class StreamingSpeakerModule(AbstractModule):42    MODULE = io.StreamingSpeakerModule43    PARAMETERS = {"chunk_size": 5000, "rate": 44100, "sample_width": 2}44    def set_content(self):45        self.gui.clear_content()46        self.gui.add_info("Chunk Size: %d" % self.retico_module.chunk_size)47        self.gui.add_info("Rate: %d" % self.retico_module.rate)48        self.gui.add_info("Sample Width: %d" % self.retico_module.sample_width)49class MicrophoneModule(AbstractModule):50    MODULE = io.MicrophoneModule51    PARAMETERS = {"chunk_size": 5000, "rate": 44100, "sample_width": 2}52    def set_content(self):53        self.gui.clear_content()54        self.gui.add_info("Chunk Size: %d" % self.retico_module.chunk_size)55        self.gui.add_info("Rate: %d" % self.retico_module.rate)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
