Best Python code snippet using autotest_python
ReductionHelpers.py
Source:ReductionHelpers.py  
1# Mantid Repository : https://github.com/mantidproject/mantid2#3# Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,4#   NScD Oak Ridge National Laboratory, European Spallation Source,5#   Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS6# SPDX - License - Identifier: GPL - 3.0 +7#pylint: disable=invalid-name8from mantid import config9import os10import re11"""12Set of functions to assist with processing instrument parameters relevant to reduction.13"""14class ComplexProperty(object):15    """ Class describes property which depends on other properties and stores/receives values in other properties """16    def __init__(self,other_prop_list):17        self._other_prop = other_prop_list18    def __get__(self,spec_dict,owner=None):19        """ return complex properties list """20        if spec_dict is None:21            # access to property methods22            return self23        if not isinstance(spec_dict,dict):24            spec_dict = spec_dict.__dict__25        rez = list()26        for key in self._other_prop:27            rez.append(spec_dict[key])28        # process very important case of property dependent on two other29        # properties.  Make it tuple30        if len(rez) == 2:31            return (rez[0],rez[1])32        else:33            return rez34    #35    def __set__(self,instance,value):36        try:37            lv = len(value)38        except:39            raise KeyError("Complex property values can be assigned only by group of other values")40        if lv != len(self._other_prop):41            raise KeyError("Complex property values can be set equal to the same length values list")42        if isinstance(instance,dict):43            spec_dict = instance44        else:45            spec_dict = instance.__dict__46        #changed_prop=[];47        for key,val in zip(self._other_prop,value):48            spec_dict[key] = val49                #changed_prop.append(key);50        #return changed_prop;51    def dependencies(self):52        """ returns the list of other properties names, this property depends on"""53        return self._other_prop54    def len(self):55        """ returns the number of properties, this property depends on"""56        return len(self._other_prop)57#end ComplexProperty58def findFile(fileName):59    """ Simple search within Mantid data search directories for60        a file which is not a run file61    """62    if os.path.exists(fileName):63        return os.path.abspath(fileName)64    fbase = os.path.basename(fileName)65    search_path = config.getDataSearchDirs()66    for path in search_path:67        fname = os.path.join(path,fbase)68        if os.path.exists(fname):69            return fname70    return ''71def get_default_parameter(instrument, name):72    """ Function gets the value of a default instrument parameter and73            assign proper(the one defined in IPF ) type to this parameter74            @param instrument --75        """76    if instrument is None:77        raise ValueError("Cannot initiate default parameter, instrument has not been properly defined.")78    type_name = instrument.getParameterType(name)79    if type_name == "double":80        val = instrument.getNumberParameter(name)81    elif type_name == "bool":82        val = instrument.getBoolParameter(name)83    elif type_name == "string":84        val = instrument.getStringParameter(name)85        if val[0] == "None" :86            return None87    elif type_name == "int" :88        val = instrument.getIntParameter(name)89    else :90        raise KeyError(" Instrument: {0} does not have parameter with name: {1}".format(instrument.getName(),name))91    return val[0]92def get_default_idf_param_list(pInstrument,synonims_list=None):93    """ Obtain default reduction parameters list from the instrument """94    params = pInstrument.getParameterNames()95    par_list = {}96    for name in params:97        if synonims_list:98            if name in synonims_list:99                key_name = synonims_list[name]100            else:101                key_name = name102        else:103            key_name = name104        par_list[key_name] = get_default_parameter(pInstrument,name)105    return par_list106def build_properties_dict(param_map,synonims,descr_list=[]) :107    """ function builds the properties list from the properties strings obtained from Insturment_Parameters.xml file108       The properties, which have simple values are added to dictionary in the form:109       properties_dict[prop_name]=(False,value);110       The properties, expressed trough the values of other properties111       e.g. strings in a form key1 = key2:key3112       are added to the dictionary in the form:113       in the form properties_dict[key1]=(True,['key2','key3'])114    """115    # dictionary used for substituting composite keys.116    prelim_dict = dict()117    for name in param_map:118        if name in synonims:119            final_name = str(synonims[name])120        else:121            final_name = str(name)122        prelim_dict[final_name] = None123    param_keys = list(prelim_dict.keys())124    properties_dict = dict()125    descr_dict = dict()126    for name,val in param_map.items() :127        if name in synonims:128            final_name = str(synonims[name])129        else:130            final_name = str(name)131        is_descriptor = False132        if final_name in descr_list:133            is_descriptor = True134        if isinstance(val, str):135            val = val.strip()136            keys_candidates = val.split(":")137            n_keys = len(keys_candidates)138               #139            if n_keys > 1 : # this is the property we want to modify140                result = list()141                for key in keys_candidates :142                    if key in synonims:143                        rkey = str(synonims[key])144                    else:145                        rkey = str(key)146                    if rkey in param_keys:147                        result.append(rkey)148                    else:149                        raise KeyError('Substitution key : {0} is not in the list of allowed keys'.format(rkey))150                if is_descriptor:151                    descr_dict[final_name] = result152                else:153                    properties_dict['_' + final_name] = ComplexProperty(result)154            else:155                if is_descriptor:156                    descr_dict[final_name] = keys_candidates[0]157                else:158                    properties_dict[final_name] = keys_candidates[0]159        else:160            if is_descriptor:161                descr_dict[final_name] = val162            else:163                properties_dict[final_name] = val164    return (properties_dict,descr_dict)165def extract_non_system_names(names_list,prefix='_'):166    """ The function processes the input list and returns167        the list with names which do not have the system framing (leading __)168    """169    result = list()170    ns = len(prefix)171    for name in names_list:172        pend = min(ns,len(name))173        if name[:pend] != prefix:174            result.append(name)175    return result176def build_subst_dictionary(synonims_list=None) :177    """Function to process "synonims_list" in the instrument parameters string, used to support synonyms in the reduction script178       it takes string of synonyms in the form key1=subs1=subst2=subts3;key2=subst4 and returns the dictionary179       in the form dict[subs1]=key1 ; dict[subst2] = key1 ... dict[subst4]=key2180       e.g. if one wants to use the IDF key word my_detector instead of e.g. norm-mon1-spec, he has to type181       norm-mon1-spec=my_detector in the synonyms field of the IDF parameters file.182    """183    if not synonims_list :  # nothing to do184        return dict()185    if isinstance(synonims_list, dict) : # all done186        return synonims_list187    if not isinstance(synonims_list, str):188        raise AttributeError("The synonyms field of Reducer object has to be special format string or the dictionary")189        # we are in the right place and going to transform string into190        # dictionary191    subst_lines = synonims_list.split(";")192    rez = dict()193    for lin in subst_lines :194        lin = lin.strip()195        keys = lin.split("=")196        if len(keys) < 2 :197            raise AttributeError("The pairs in the synonyms fields have to have form key1=key2=key3 with at least two values present")198        if len(keys[0]) == 0:199            raise AttributeError("The pairs in the synonyms fields have to have form key1=key2=key3 with at least two values present, "200                                 "but the first key is empty")201        for i in range(1,len(keys)) :202            if len(keys[i]) == 0 :203                raise AttributeError("The pairs in the synonyms fields have to have form key1=key2=key3 with at least two values present, "204                                     "but the key" + str(i) + " is empty")205            kkk = keys[i].strip()206            rez[kkk] = keys[0].strip()207    return rez208def gen_getter(keyval_dict,key):209    """ function returns value from dictionary with substitution210        e.g. if keyval_dict[A] = 10, keyval_dict[B] = 20 and key_val[C] = [A,B]211        gen_getter(keyval_dict,A) == 10;  gen_getter(keyval_dict,B) == 20;212        and gen_getter(keyval_dict,C) == [10,20];213    """214    if key not in keyval_dict:215        name = '_' + key216        if name not in keyval_dict:217            raise KeyError('Property with name: {0} is not among the class properties '.format(key))218    else:219        name = key220    a_val = keyval_dict[name]221    if isinstance(a_val,ComplexProperty):222        return a_val.__get__(keyval_dict)223    else:224        return a_val225    #end226#end227def gen_setter(keyval_dict,key,val):228    """ function sets value to dictionary with substitution229        e.g. if keyval_dict[A] = 10, keyval_dict[B] = 20 and key_val[C] = [A,B]230        gen_setter(keyval_dict,A,20)  causes keyval_dict[A] == 20231        gen_setter(keyval_dict,B,30)  causes keyval_dict[B] == 30232        and gen_getter(keyval_dict,C,[1,2]) causes keyval_dict[A] == 1 and keyval_dict[B] == 2233    """234    if key not in keyval_dict:235        name = '_' + key236        if name not in keyval_dict:237            raise KeyError(' Property name: {0} is not defined'.format(key))238    else:239        name = key240    test_val = keyval_dict[name]241    if isinstance(test_val,ComplexProperty):242        # Assigning values for composite function to the function components243        test_val.__set__(keyval_dict,val)244        return None245    else:246        keyval_dict[key] = val247    return None248def check_instrument_name(old_name,new_name):249    """ function checks if new instrument name is acceptable instrument name"""250    if new_name is None:251        if old_name is not None:252            return (None,None,config.getFacility())253        else:254            raise KeyError("No instrument name is defined")255    if old_name == new_name:256        return (None,None,None)257    # Instrument name might be a prefix, query Mantid for the full name258    short_name = ''259    full_name = ''260    try :261        instrument = config.getFacility().instrument(new_name)262        short_name = instrument.shortName()263        full_name = instrument.name()264        facility = config.getFacility()265    except RuntimeError:266        # it is possible to have wrong facility:267        facilities = config.getFacilities()268        for facility in facilities:269            #config.setString('default.facility',facility.name())270            try :271                instrument = facility.instrument(new_name)272                short_name = instrument.shortName()273                full_name = instrument.name()274                if len(short_name) > 0 :275                    break276            except:277                pass278        #config.setString('default.facility',old_facility)279        if len(short_name) == 0 :280            raise KeyError(" Can not find/set-up the instrument: " + new_name + ' in any supported facility')281    new_name = short_name282    #config['default.instrument'] = full_name283    return (new_name,full_name,facility)284def parse_single_name(filename):285    """ Process single run name into """286    filepath,fname = os.path.split(filename)287    if ':' in fname:288        fl,fr = fname.split(':')289        path1,ind1,ext1 = parse_single_name(fl)290        path2,ind2,ext2 = parse_single_name(fr)291        if ind1 > ind2:292            raise ValueError('Invalid file number defined using colon : left run number '293                             '{0} has to be large then right {1}'.format(ind1,ind2))294        number = list(range(ind1[0],ind2[0] + 1))295        if len(filepath) > 0:296            filepath = [filepath] * len(number)297        else:298            filepath = path1 * len(number)299        if len(ext2[0]) > 0:300            fext = ext2 * len(number)301        else:302            fext = ext1 * len(number)303        return (filepath,number,fext)304    fname,fext = os.path.splitext(fname)305    fnumber = re.findall(r'\d+', fname)306    if len(fnumber) == 0:307        number = 0308    else:309        number = int(fnumber[0])310    return ([filepath],[number],[fext])311def parse_run_file_name(run_string):312    """ Parses run file name to obtain run number, path if possible, and file extension if any present in the string"""313    if not isinstance(run_string, str):314        raise ValueError("REDUCTION_HELPER:parse_run_file_name -- input has to be a string")315    runs = run_string.split(',')316    filepath = []317    filenum = []318    fext = []319    anExt = ''320    for run in runs:321        path,ind,ext1 = parse_single_name(run)322        filepath+=path323        filenum+=ind324        fext+=ext1325    non_empty = [x for x in fext if len(x) > 0]326    if len(non_empty) > 0:327        anExt = non_empty[-1]328        for i,val in enumerate(fext):329            if len(val) == 0:330                fext[i] = anExt331    if len(filenum) == 1:332        filepath = filepath[0]333        filenum = filenum[0]334        fext = fext[0]335    # extensions should be either all the same or all defined336    return (filepath,filenum,fext)337#338def process_prop_list(workspace,logName="CombinedSpectraIDList"):339    """Method process log, which describes list of spectra, attached to340       the workspace341    """342    if workspace.run().hasProperty(logName):343        spec_id_str = workspace.run().getProperty(logName).value344        spec_id_str = spec_id_str.replace('[','').replace(']','').strip()345        spec_id_listS = spec_id_str.split(',')346        spec_list = []347        for val in spec_id_listS:348            spec_list.append(int(val))349    else:350        spec_list = []...create_fwy_data.py
Source:create_fwy_data.py  
1import matplotlib.pyplot as plt2import numpy as np3import pandas as pd4import datetime5import time6# Set global variables7metric_colnames =  ['Total Flow', 'Avg Occupancy', 'Avg Speed']8index_colnames = ['timeOfDay', 'Abs_PM']9helper_colnames = ['Weekday']10# Define Functions11##### reduce_data_by_dict12# This function is a universal function that takes a dict and selects dataframes based on the identified key value pair13def reduce_data_by_dict(df, keyval_dict):14    for key, val in keyval_dict.iteritems():15        df = df[df[key] == val]16    return df17##### get_fwy_data18# This function loads the raw, minute data (takes a LONG time to load).  From this it selects only 1 freeway and direction.19# It may make sense to run this only once, but the data for each freeway could get rather large.20# There's an opportunity to use spark RDD's to increase performance21def get_fwy_data(_fwy,_dir, overwrite=False, output_csv=False, nimportrows=-1):22    # Ensure proper naming convention for file23    myname = "".join([ "i",24                  str(_fwy),25                  str(_dir)])26    filepath = "".join(["../data/metric_statistics/",myname,".csv"])27    # Check to see if the file should be overwritten or if it is not available.28    try:29        if overwrite == False:30            freeway = pd.read_csv(filepath, sep='\t')31    except:32        overwrite = True33    if overwrite:34        # import raw data35        raw_5_min_filepath = '../../../five_min_frame.csv'36        if nimportrows == -1:37            raw_5_min_data = pd.read_csv(raw_5_min_filepath, nrows=nimportrows)38        else:39            raw_5_min_data = pd.read_csv(raw_5_min_filepath)40        raw_meta_filepath = '../../../d11_traffic_data/meta/d11/d11_text_meta_2015_01_01.txt'41        meta = pd.read_csv(raw_meta_filepath, sep='\t')42        # Filter raw and meta datasets by freeway and direction43        # Reduce raw_5_min data44        keyval_dict = {"District": 11, 45                   "Freeway #": _fwy, 46                   "Lane Type": 'ML', 47                   "Direction": _dir}48        redux_5_min = reduce_data_by_dict(raw_5_min_data, keyval_dict)49        50        #Reduce meta51        keyval_dict = {"District": 11, 52                   "Fwy": _fwy, 53                   "Dir": _dir}54        redux_meta = reduce_data_by_dict(meta, keyval_dict)55        56        # Create helper columns57        raw_5_min_data['time'] = pd.to_datetime(raw_5_min_data['Timestamp'], format="%m/%d/%Y %H:%M:%S")58        raw_5_min_data['timeOfDay'] = raw_5_min_data['time'].apply(lambda x: x.strftime("%H:%M")) 59        raw_5_min_data['Weekday'] = raw_5_min_data['time'].dt.weekday60        61        # Keep only columns used62        redux_5_min = raw_5_min_data[metric_colnames + helper_colnames +  ['Station', 'timeOfDay', 'District','Freeway #', 'Direction']]63        redux_meta = redux_meta[['ID', 'Fwy','District', 'Dir','Abs_PM']]64        65        # Get the Station's Absolute marker by merging meta66        freeway = redux_5_min.merge(redux_meta, 67                              left_on=['Station', 'District','Freeway #', 'Direction'],68                              right_on=['ID', 'District', 'Fwy', 'Dir'])69        # Export for later use70        if output_csv:71            freeway.to_csv(filepath)72    73    return freeway74##### get_fwy_dataByDay75# Using the existing freeway, this function restricts the dataset to a specific day of the week and writes the results to a csv76def get_fwy_dataByDay(df, _daynum, overwrite=False, output_csv=False): 77    weekday = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]78    myname = "".join([ "i",79                str(_fwy),80                str(_dir), 81                "_", 82                weekday[_daynum]])83    84    # Check to see if the file should be overwritten or if it is not available.85    try:86        if overwrite == False:87            freewayByDay = pd.read_csv(filepath, sep='\t')88    except:89        overwrite = True90    91    if overwrite:92        keyval_dict = {"Weekday": _daynum} 93        freewayByDay = reduce_data_by_dict(df, keyval_dict)94    95    if output_csv:96        freewayByDay.to_csv(filepath)97    return freewayByDay98# Still working on this99def create_freeway_stats(df):100    index_fields = index_colnames()101    stats = ['mean','std']102    103    #Reduce columns of dataset only to columns of interest104    fwy_grouping = df[['Total Flow', 'Avg Occupancy', 'Avg Speed', 'timeOfDay', 'Abs_PM'] ].groupby(index_fields)105    metrics = fwy_grouping.agg([np.mean, np.std])106    metric_stats = ['mean+std', 'mean', 'mean-std']107    108    new_cols = []109    for a in metric_colnames():110        for b in metric_stats:111            name = "_".join([a.replace(" ", ""),b])112            #print name113            new_cols.append(name)114            if b == 'mean+std':115                metrics[name] = metrics[a]['mean']+ metrics[a]['std']116            elif b == 'mean-std':117                metrics[name] = metrics[a]['mean']- metrics[a]['std']118            else:119                metrics[name] = metrics[a]['mean']120                121    fields = new_cols+index_fields122    123    metrics = metrics.reset_index()124    metrics = metrics[fields]...request_wrapper.py
Source:request_wrapper.py  
1import subprocess2import pycurl3import json4from StringIO import StringIO 5from urllib import urlencode6import os7class RequestWrapper:8    """ Wrapper for making http requests to xsdb api """9    base_url = 'https://cms-gen-dev.cern.ch/xsdb'10    api_url = base_url + '/api'11    subprocess.call(['bash', 'getCookie.sh'])12    c = pycurl.Curl()13    c.setopt(pycurl.FOLLOWLOCATION, 1)14    c.setopt(pycurl.COOKIEJAR, os.path.expanduser("~/private/xsdbdev-cookie.txt"))15    c.setopt(pycurl.COOKIEFILE, os.path.expanduser("~/private/xsdbdev-cookie.txt"))16    c.setopt(pycurl.HTTPHEADER, ['Content-Type:application/json', 'Accept:application/json'])17    c.setopt(pycurl.VERBOSE, 0)18    def simple_search(self, keyval_dict):19        self._perform_post(self.api_url + '/search', json.dumps(keyval_dict))20    def adv_search(self, keyval_dict={}, page_size=20, current_page=0, orderby_field="", order_direction=1):21        order_by = {}22        23        if orderby_field != "":24            order_by[orderby_field] = order_direction25        query = {26            'search': keyval_dict,27            'pagination':{28                'pageSize': page_size,29                'currentPage': current_page30            },31            'orderBy': order_by32        }33        self._perform_post(self.api_url + '/search', json.dumps(query))34    def insert(self, keyval_dict={}):35        self._perform_post(self.api_url + '/insert', json.dumps(keyval_dict))36    def update(self, keyval_dict, record_id):37        self._perform_post(self.api_url + '/update/' + record_id, json.dumps(keyval_dict))38    def get_last_inserted_by_user(self, user_name):39        buffer = StringIO()40        self.c.setopt(self.c.URL, self.api_url + '/get_last_by_user/' + user_name)41        self.c.setopt(self.c.WRITEFUNCTION, buffer.write)42        self.c.perform()43        body = buffer.getvalue()44        return json.loads(body)45    def _perform_post(self, url, post_fields):46        self.c.setopt(self.c.URL, url)47        self.c.setopt(pycurl.POST, 1)48        self.c.setopt(self.c.POSTFIELDS, post_fields)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
