Best Python code snippet using pandera_python
get_columns.py
Source:get_columns.py  
1'''2    Get all the columns in where clause and write to output_columns_in_where.txt file3'''4from Utilities import unpack_select_from_elements5def util_trim_comma(input_string):6    '''7        this utility will check if it has comma at the end, then trim comma8    '''9    # strip string for trailing spaces10    input_string = input_string.strip()11    if input_string[-1] == ',':12        # comma found13        input_string = input_string[:-1] #remove last character which is comma14        input_string = input_string.strip() # do another stripping15    return input_string16def process_word(**kwargs):17    '''18        function to process the word read from input stream and add to the list (based on the input)19        accepted paramaters:20            object_list - either it is select_columns, where_columns, from_tables, etc.21            word - word to be processed22            flag_dict - check if a separator was found23    '''24    object_list = kwargs.get('object_list', None)25    word = kwargs.get('word', None)26    flag_dict = kwargs.get('flag_dict', None)27    if object_list:28        #list not empty29        if not flag_dict['separator_found_flag']:30            if ',' not in word:31                # for aliased column32                # append the word to the last element33                object_list[-1] = "{} {}".format(object_list[-1],word)34            else:35                # comma found36                object_list[-1] = "{} {}".format(object_list[-1],word)37                flag_dict['separator_found_flag'] = True38        else:39            object_list.append(word)40            if ',' not in word:41                flag_dict['separator_found_flag'] = False42    else:43        object_list.append(word)44def process_collection(**kwargs):45    '''46        do some processing on the collection lists47        this will be executed after statement terminator is found48        expected arguments:49            select_columns - list of the columns in select50            from_tables - list of tables in from clause51            where_columns - list of columns used in where clause52        return value:53    '''54    select_columns = kwargs.get('select_columns', None)55    where_columns = kwargs.get('where_columns', None)56    from_tables = kwargs.get('from_tables', None)57    #process from_tables58    from_tables_dict_list = list()59    '''60        dictionary format:61            {62                'table_name': <name of table>,63                'table_alias': <alias(if any, else None)>64            }65    '''66    for table in from_tables:67        pass68    #process select_columns69    select_columns_dict_list = list()70    '''71        dictionary format:72            {73                'column_full_name': <column_full_name_as_specfied>,74                'column_name': <name of column>,75                'alias': <alias(empty if alias not use)>,76                'table': <from_what_table(empty if not provided in sql)77            }78    '''79    for column in select_columns:80        select_columns_dict = dict()81        if len(column.split()) == 3:82            # column has alias83            select_columns_dict['column_name'] = column.split()[0]84            select_columns_dict['alias'] = util_trim_comma(column.split()[-1])85        else:86            select_columns_dict['column_name'] = util_trim_comma(column.split()[0])87            select_columns_dict['alias'] = None88        #check for table89        #add logic for aliased table90        if len(select_columns_dict['column_name'].split('.')) > 1:91            # table is specified in column name92            select_columns_dict['column_full_name'] = select_columns_dict['column_name']93            select_columns_dict['column_name'] = select_columns_dict['column_name'].split('.')[-1]94            #remove the column name to get table name95            select_columns_dict['table'] = select_columns_dict['column_full_name'][:(96                    len(select_columns_dict['column_full_name']) - len(select_columns_dict['column_name']) - 197                    )]98            #logic for alias tabled will be added here....99        else:100            select_columns_dict['table'] = None101        select_columns_dict_list.append(select_columns_dict)102    print('''103        select_columns_orig: {}104        select_columns: {}105        tables: {}106    '''.format(select_columns,select_columns_dict_list,from_tables)107    )108    # empty collections109    select_columns.clear()110    where_columns.clear()111    from_tables.clear()112def process_collection_(**kwargs):113    '''114        temporary function115    '''116    select_clause = kwargs.get('select_clause',None)117    select_columns = unpack_select_from_elements(clause_string=select_clause)118    from_clause = kwargs.get('from_clause', None)119    from_tables = unpack_select_from_elements(clause_string=from_clause)120    print('''121        select clause: {}122        from clause: {}123        '''.format(select_columns,from_tables))124with open('input_columns_in_where.txt', 'r') as f:125    # write_to_file = open('output_columns_in_where.txt')126    select_columns = list()127    where_columns = list()128    from_tables = list()129    select_clause = ''130    from_clause = ''131    current_sql_clause = None132    flag_dict = dict()133    flag_dict['separator_found_flag'] = False134    flag_dict['statement_terminator_found_flag'] = False135    flag_dict['beginning_flag'] = True136    flag_dict['last_word_is_keyword'] = False137    SQL_CLAUSE_KEYWORDS = ('SELECT','FROM','WHERE','GROUP','ORDER') #get only the first word138    for line in f: # check every line from input file139        for word in line.split(): #check every word in line140            if word[:2] == '--':141                # comment found, move to next line142                break143            #check for keyword144            if word.upper() in SQL_CLAUSE_KEYWORDS:145                # keword found146                flag_dict['separator_found_flag'] = False147                flag_dict['last_word_is_keyword'] = True148                current_sql_clause = word.upper()149                if word.upper() == 'SELECT' and not flag_dict['beginning_flag']:150                    process_collection(151                        select_columns = unpack_select_from_elements(clause_string=select_clause),152                        from_tables = unpack_select_from_elements(clause_string=from_clause),153                        where_columns=where_columns154                    )155                    # process_collection(select_clause=select_clause,from_clause=from_clause)156                continue157            else:158                flag_dict['last_word_is_keyword'] = False159            if current_sql_clause:160                if current_sql_clause == 'SELECT':161                    select_clause = ' '.join([select_clause, word])162                if current_sql_clause == 'FROM':163                    from_clause = ' '.join([from_clause, word])164                if current_sql_clause == 'WHERE':165                    where_columns.append(word)166                if flag_dict['statement_terminator_found_flag'] \167                    and not flag_dict['beginning_flag']:168                    #end of statement. do some processing169                    flag_dict['statement_terminator_found_flag'] = False170            flag_dict['beginning_flag'] = False171    process_collection(172        select_columns=select_columns,173        from_tables=from_tables,174        where_columns=where_columns175    )176    # print('''177    #     Select columns: {}...tabular.py
Source:tabular.py  
1from tensorflow.python.data.experimental.ops.readers import (2    dataset_ops,3    dtypes,4    constant_op,5    interleave_ops,6    _get_sorted_col_indices,7    CsvDataset,8    _maybe_shuffle_and_repeat,9)10from tensorflow.python.data.experimental.ops.readers import (11    _infer_column_defaults,12    _infer_column_names,13    _get_file_names,14)15import tensorflow as tf16import collections17from tensorflow.data.experimental import AUTOTUNE18from tensorflow_core.python.lib.io import file_io19_ACCEPTABLE_CSV_TYPES = (20    dtypes.float32,21    dtypes.float64,22    dtypes.int32,23    dtypes.int64,24    dtypes.string,25)26def make_csv_dataset(27    file_pattern,28    batch_size,29    column_names=None,30    column_defaults=None,31    label_names=None,32    select_columns=None,33    field_delim=",",34    use_quote_delim=True,35    na_value="",36    header=True,37    num_epochs=True,38    shuffle=True,39    shuffle_buffer_size=10000,40    shuffle_seed=None,41    prefetch_buffer_size=AUTOTUNE,42    num_parallel_reads=1,43    sloppy=False,44    num_rows_for_inference=100,45    compression_type=None,46):47    filenames = _get_file_names(file_pattern, False)48    dataset = dataset_ops.Dataset.from_tensor_slices(filenames)49    # Clean arguments; figure out column names and defaults50    if column_names is None:51        if not header:52            raise ValueError("Cannot infer column names without a header line.")53        # If column names are not provided, infer from the header lines54        column_names = _infer_column_names(55            filenames,56            field_delim,57            use_quote_delim,58            lambda filename: file_io.FileIO(filename, "r"),59        )60    if len(column_names) != len(set(column_names)):61        raise ValueError("Cannot have duplicate column names.")62    if select_columns is not None:63        select_columns = _get_sorted_col_indices(select_columns, column_names)64    if column_defaults is not None:65        column_defaults = [66            constant_op.constant([], dtype=x) if x in _ACCEPTABLE_CSV_TYPES else x67            for x in column_defaults68        ]69    else:70        # If column defaults are not provided, infer from records at graph71        # construction time72        column_defaults = _infer_column_defaults(73            filenames,74            len(column_names),75            field_delim,76            use_quote_delim,77            na_value,78            header,79            num_rows_for_inference,80            select_columns,81            lambda filename: file_io.FileIO(filename, "r"),82        )83    if select_columns is not None and len(column_defaults) != len(select_columns):84        raise ValueError(85            "If specified, column_defaults and select_columns must have same " "length."86        )87    if select_columns is not None and len(column_names) > len(select_columns):88        # Pick the relevant subset of column names89        column_names = [column_names[i] for i in select_columns]90    if label_names is None:91        raise ValueError("`label_name` provided must be one of the columns.")92    if isinstance(label_names, (list, tuple)):93        for l in label_names:94            if l not in column_names:95                raise ValueError("`label_name` provided must be one of the columns.")96    else:97        if label_names not in column_names:98            raise ValueError("`label_name` provided must be one of the columns.")99    def filename_to_dataset(filename):100        return CsvDataset(101            filename,102            record_defaults=column_defaults,103            field_delim=field_delim,104            use_quote_delim=use_quote_delim,105            na_value=na_value,106            select_cols=select_columns,107            header=header,108            compression_type=compression_type,109        )110    def map_fn(*columns):111        """Organizes columns into a features dictionary.112        Args:113          *columns: list of `Tensor`s corresponding to one csv record.114        Returns:115          An OrderedDict of feature names to values for that particular record. If116          label_name is provided, extracts the label feature to be returned as the117          second element of the tuple.118        """119        features = collections.OrderedDict(zip(column_names, columns))120        if label_names is not None:121            if isinstance(label_names, (list, tuple)):122                labels = []123                for l in label_names:124                    labels.append(features.pop(l))125                return features, tf.stack(labels, 1)126            else:127                labels = features.pop(label_names)128                return features, labels129        return features130    # Read files sequentially (if num_parallel_reads=1) or in parallel131    dataset = dataset.apply(132        interleave_ops.parallel_interleave(133            filename_to_dataset, cycle_length=num_parallel_reads, sloppy=sloppy134        )135    )136    dataset = _maybe_shuffle_and_repeat(137        dataset, num_epochs, shuffle, shuffle_buffer_size, shuffle_seed138    )139    dataset = dataset.batch(batch_size=batch_size, drop_remainder=num_epochs is None)140    # Apply batch before map for perf, because map has high overhead relative141    # to the size of the computation in each map.142    # NOTE(mrry): We set `drop_remainder=True` when `num_epochs is None` to143    # improve the shape inference, because it makes the batch dimension static.144    # It is safe to do this because in that case we are repeating the input145    # indefinitely, and all batches will be full-sized.146    dataset = dataset_ops.MapDataset(dataset, map_fn, use_inter_op_parallelism=False)147    dataset = dataset.prefetch(prefetch_buffer_size)...DbService.py
Source:DbService.py  
1import pandas as pd2from DbDAO import DbDao3class DbService:4    def __init__(self):5        self.__dao = DbDao()6    def name_columns(self, name_table: str):7        return self.__dao.name_columns(name_table=name_table)8    def insert(self, name_table: str, list_record: list):9        name_columns = self.__dao.name_columns(name_table=name_table)10        return self.__dao.insert(name_table=name_table, name_columns=name_columns, list_record=list_record)11    def is_not_empty(self, name_table: str) -> bool:12        return self.__dao.is_not_empty(name_table=name_table)13    def count_records(self, name_table: str) -> int:14        return self.__dao.count_records(name_table=name_table)15    def get_all_value_in_column(self, name_column, name_table) -> list:16        return self.__dao.get_all_value_in_column(name_column=name_column, name_table=name_table)17    def get_select_with_where(self, select_columns, name_table: str, where_columns, values_column):18        return self.__dao.get_select_with_where(select_columns=select_columns, name_table=name_table,19                                                where_columns=where_columns, values_column=values_column)20    def get_df_select_with_where(self, select_columns, name_table: str, where_columns, values_column):21        select = self.__dao.get_select_with_where(select_columns=select_columns, name_table=name_table,22                                                  where_columns=where_columns, values_column=values_column)23        return pd.DataFrame(data=select, columns=select_columns)24    def delete_where_condition(self, name_table: str, where_columns, values_column):25        self.__dao.delete_where_condition(name_table=name_table, where_columns=where_columns,26                                          values_column=values_column)27    def delete(self, name_table: str):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
