Best Python code snippet using pandera_python
Dynamic SQL Creation.py
Source:Dynamic SQL Creation.py  
1# -*- coding: utf-8 -*-2"""3    purpose: To get the results from an API call and generate a SQL Command to store the data4    author: Alberto Rodriguez5    date: 2019+06+016"""7import os8import pyodbc9import requests10import pandas as pd11from datetime import datetime12# Global Variables13startTime = datetime.now()14date = datetime.now().strftime('%Y+%m+%d')15os.chdir(os.path.dirname((os.path.abspath(__file__))))  # Sets the working directory to the path of the script file16fileName = 'testdb'17def print_msg(msg):18    # TODO: Save the log message into a text file; Add the time the script is run into the name19    print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} {msg}')20def print_exception(exception):21    print_msg(exception)22    exit(1)23def get_api():24    # url = 'https://jsonplaceholder.typicode.com/photos'25    # url = 'https://jsonplaceholder.typicode.com/comments'26    url = 'https://jsonplaceholder.typicode.com/posts'27    hdr = {28        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '29                      'Chrome/74.0.3729.169 Safari/537.36',30        'Content-Type': 'application/json',31    }32    params = {33        'userId': '1'34    }35    with requests.Session() as s:36        print_msg(f'Attempting to access {url}')37        # Will try to make the API GET request and store the JSON response38        try:39            r = s.get(url, headers=hdr, params=params)40            print_msg('Success')41            api_json_response = r.json()  # If the response is nested, handle it here using api_json_response['name']42        except Exception as e:43            print_exception(e)44        formatted_response = []  # Empty list to store each row of data45        # Loop will go through the response and parse it into a list then, save it as a pandas data frame46        for counter, value in enumerate(api_json_response):47            formatted_response.append(value)48        try:49            dFrame = pd.DataFrame(formatted_response)  # Convert the list to a pandas data frame50            dFrame.to_csv(f'{date}_{fileName}.csv', index=False)  # Saves the file locally51            print_msg('Saved file locally.')52        except Exception as e:53            print_exception(e)54        # Pass the data to be saved to the SQL Server Sandbox including the file name that will be used as the table55        load_data_to_db(formatted_response, 'test_db')56def load_data_to_db(data, tableName):57    # Necessary parameters to make the connection58    con = pyodbc.connect(59        Trusted_Connection='yes',60        driver='{SQL Server}',61        server=r'DESKTOP-JQ3I8KK\SQLEXPRESS',62        autocommit=True)63    cursor = con.cursor()  # Cursor object to execute SQL Commands64    xdbName = 'Database1'65    xtblName = f'{xdbName}.dbo.{tableName}'66    def create_table():67        # Next couple of lines will manipulate the column names to be MS SQL ready, see format below:68        # [col name] VarChar(col length)69        unique_column_names = []70        for counter, value in enumerate(data):71            for key in value.keys():72                col_name = f'[{key}] VARCHAR(max)'73                # col_name = col_name.lower()74                if col_name not in unique_column_names:  # Avoids duplication of the column names75                    unique_column_names.append(col_name)  # Adds the unique value, formatted column to the list76        print_msg(f'Attempting to create {xtblName}.')77        # Tries to put together the SQL command to create the table dynamically78        try:79            sql = f'DROP TABLE IF EXISTS {xtblName}; ' \80                f'CREATE TABLE {xtblName}({", ".join(col for col in unique_column_names)})'81            cursor.execute(sql)82            print_msg(f'{sql}.')83            print_msg(f'Successfully created {xtblName}.')84        except Exception as e:85            print_exception(e)86    def insert_query():87        # Tries to create and execute the insert statement for MSSQL88        try:89            for counter, value in enumerate(data):90                # Created this step to deal with special characters that may cause the SQL to fail91                tempList = []  # Used to store the manipulated values92                for words in value.values():93                    words = str(words).replace("'", "\'\'")  # Replaces single ' with double; escape char for MSSQL94                    tempList.append(words)  # Keeps a running list of manipulated words95                # will store the columns and values needed to dynamically make the SQL insert statement96                columns = (', '.join([key for key, information in value.items()]))97                values = (", ".join([f'\'{str(information)}\'' for information in tempList]))98                sql = f'INSERT INTO {xtblName} ({columns}) VALUES ({values})'99                cursor.execute(sql)100                print_msg(f'Successfully updated {xtblName} {sql}.')101        except Exception as e:102            print_exception(e)103        print_msg(f'Inserted {counter+1} rows into {xtblName}\n')104    # Tries to create the table and load the data into the SQL table105    try:106        create_table()107        insert_query()108    except Exception as e:109        print_exception(e)110def elapsed_time():111    seconds = datetime.now() - startTime112    seconds = int(seconds.total_seconds())113    days, seconds = divmod(seconds, 86400)114    hours, seconds = divmod(seconds, 3600)115    minutes, seconds = divmod(seconds, 60)116    if days > 0:117        print_msg('Total runtime for this script was %dd %dh %dm %ds' % (days, hours, minutes, seconds))118    elif hours > 0:119        print_msg('Total runtime for this script was %dh %dm %ds' % (hours, minutes, seconds))120    elif minutes > 0:121        print_msg('Total runtime for this script was %dm %ds' % (minutes, seconds))122    else:123        print_msg('Total runtime for this script was %ds' % (seconds,))124# **********************************************************************************************************************125if __name__ == '__main__':126    get_api()...vcf2tsv.py
Source:vcf2tsv.py  
1import io2import argparse3import pandas as pd4# Class which contains utility functions to convert VCF to TSV5class VcfToTsv:6    """7    Read a vcf file and convert it to a tsv file.8    Uses:9        `Pandas`10    """11    def __init__(self, input_vcf: str, output_tsv: str):12        self.input_vcf = input_vcf13        self.output_tsv = output_tsv14        # Read VCF15        lines: list = self.read_vcf()16        # Write output to TSV file17        self.write_output(lines)18    # Read data in the input VCF file19    def read_vcf(self) -> list:20        try:21            with open(self.input_vcf, "rt") as f:22                lines: list = [l for l in f if not l.startswith("##")]23                return lines24        except Exception:25            print("Error Reading VCF File")26    def write_output(self, lines: list) -> None:27        data = pd.read_csv(28            io.StringIO("".join(lines)),29            dtype={30                "#CHROM": str,31                "POS": int,32                "ID": str,33                "REF": str,34                "ALT": str,35                "QUAL": str,36                "FILTER": str,37                "INFO": str,38            },39            sep="\t",40        ).rename(columns={"#CHROM": "CHROM"})41        main_vcf = data.filter(42            ["CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER"], axis=143        )44        info_data_modified = self.get_info_data(data)45        format_data_modified = self.get_format_data(data)46        modified_tags = main_vcf.join(info_data_modified)47        modified_tags = modified_tags.join(format_data_modified)48        modified_tags.to_csv(self.output_tsv, sep="\t", encoding="utf-8", index=False)49    # Parse Info Data50    def get_info_data(self, data: pd) -> pd:51        # Get original info data52        info = data["INFO"].str.split(";", expand=True)53        # Get unique tags in Info field and create a dataframe using these tags as columns54        tags: list = []55        for i in info.columns:56            tags.extend(info[i].str.split("=", expand=True)[0].tolist())57        not_none_tags = set(filter(None.__ne__, tags))58        info_data = pd.DataFrame(columns=not_none_tags)59        # Parse info data from the VCF and append data to main dataframe - info_data60        for index, row in info.iterrows():61            # get data for each INFO row and convert that to a dataframe62            entry = row.str.split("=", expand=True).transpose()63            new_header = entry.iloc[0]64            entry = entry[1:]65            entry.columns = new_header66            # Drop empty columns67            entry = entry.loc[:, entry.columns.notnull()].fillna("Yes")68            # Concatenate with the main info dataframe69            info_data = pd.concat([info_data, entry])70        # Reset index and replace NaN with empty string71        info_data = info_data.reset_index(drop=True)72        info_data = info_data.fillna("")73        return info_data74    # Parse FORMAT Data75    def get_format_data(self, data: pd) -> pd:76        # Get the VCF #CHROM line77        header = list(data.head(0))78        # Determine the sample columns which appear after the FORMAT column in the VCF file79        samples = self.get_sample_names(header)80        # Get the tags in format column81        format_tags = data["FORMAT"].str.split(":", expand=True)82        # Construct an overall dataframe from the unique tags in format column83        unique_column_names = pd.unique(format_tags.stack())84        format_tags_unique = pd.DataFrame(columns=unique_column_names)85        for sample in samples:86            format_tags_sample = pd.DataFrame(columns=unique_column_names)87            # Append the sample name to FORMAT tags which is the header in the tsv file88            format_names = [sub + "-" + sample for sub in unique_column_names]89            # Fetch the format data for the sample90            sample_data = data[sample].str.split(":", expand=True)91            for i in sample_data.index:92                # Construct a dataframe for each row in the sample data93                head = format_tags.loc[[i]].iloc[0]94                value = sample_data.loc[[i]]95                value.columns = head96                # Skip columns which have 'None' and reset index97                value = value.loc[:, value.columns.notnull()]98                value = value.reset_index(drop=True)99                # Concatenate the sample data to overall sample dataframe100                format_tags_sample = pd.concat([format_tags_sample, value]).fillna("")101            # Concatenate the complete sample data to the102            # main dataframe containing unique tags and reset index103            format_tags_sample.columns = format_names104            format_tags_sample = format_tags_sample.reset_index(drop=True)105            if format_tags_unique.empty:106                format_tags_unique = format_tags_sample107            else:108                format_tags_unique = format_tags_unique.join(format_tags_sample)109        return format_tags_unique110    # Get Sample Names from VCF file111    def get_sample_names(self, header: list) -> list:112        samples: list = []113        for i in range((header.index("FORMAT") + 1), (len(header))):114            samples.append(header[i])115        return samples116# Main function117if __name__ == "__main__":118    parser = argparse.ArgumentParser(description="Convert vcf to tsv file")119    parser.add_argument(120        "-i", "--input_vcf", help="Input VCF file", required=True,121    )122    parser.add_argument(123        "-o", "--output_tsv", help="Output TSV file", required=True,124    )125    args = parser.parse_args()...test_create.py
Source:test_create.py  
1import pytest2from sqlalchemy import String, Integer, Column, Table, MetaData3from sqlalchemy.exc import ProgrammingError4from db.columns.operations.select import get_column_attnum_from_name, get_columns_attnum_from_names5from db.constraints.base import UniqueConstraint6from db.constraints.operations.create import create_constraint, create_unique_constraint7from db.tables.operations.select import get_oid_from_table, reflect_table_from_oid8from db.tests.constraints import utils as test_utils9from db.metadata import get_empty_metadata10def test_create_single_column_unique_constraint(engine_with_schema):11    engine, schema = engine_with_schema12    table_name = "orders_1"13    unique_column_name = 'product_name'14    table = Table(15        table_name,16        MetaData(bind=engine, schema=schema),17        Column('order_id', Integer, primary_key=True),18        Column(unique_column_name, String),19    )20    table.create()21    test_utils.assert_only_primary_key_present(table)22    table_oid = get_oid_from_table(table_name, schema, engine)23    unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_name, engine, metadata=get_empty_metadata())24    create_constraint(schema, engine, UniqueConstraint(None, table_oid, [unique_column_attnum]))25    altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())26    test_utils.assert_primary_key_and_unique_present(altered_table)27    unique_constraint = test_utils.get_first_unique_constraint(altered_table)28    assert unique_constraint.name == f'{table_name}_{unique_column_name}_key'29    assert len(list(unique_constraint.columns)) == 130    assert list(unique_constraint.columns)[0].name == unique_column_name31def test_create_multiple_column_unique_constraint(engine_with_schema):32    engine, schema = engine_with_schema33    table_name = "orders_2"34    unique_column_names = ['product_name', 'customer_name']35    table = Table(36        table_name,37        MetaData(bind=engine, schema=schema),38        Column('order_id', Integer, primary_key=True),39        Column(unique_column_names[0], String),40        Column(unique_column_names[1], String),41    )42    table.create()43    test_utils.assert_only_primary_key_present(table)44    table_oid = get_oid_from_table(table_name, schema, engine)45    unique_column_attnums = get_columns_attnum_from_names(table_oid, unique_column_names, engine, metadata=get_empty_metadata())46    create_constraint(schema, engine, UniqueConstraint(None, table_oid, unique_column_attnums))47    altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())48    test_utils.assert_primary_key_and_unique_present(altered_table)49    unique_constraint = test_utils.get_first_unique_constraint(altered_table)50    unique_column_name_1 = unique_column_names[0]51    assert unique_constraint.name == f'{table_name}_{unique_column_name_1}_key'52    assert len(list(unique_constraint.columns)) == 253    assert set([column.name for column in unique_constraint.columns]) == set(unique_column_names)54def test_create_unique_constraint_with_custom_name(engine_with_schema):55    engine, schema = engine_with_schema56    table_name = "orders_4"57    unique_column_name = 'product_name'58    constraint_name = 'unique_product_name'59    table = Table(60        table_name,61        MetaData(bind=engine, schema=schema),62        Column('order_id', Integer, primary_key=True),63        Column(unique_column_name, String),64    )65    table.create()66    table_oid = get_oid_from_table(table_name, schema, engine)67    unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_name, engine, metadata=get_empty_metadata())68    create_constraint(schema, engine, UniqueConstraint(constraint_name, table_oid, [unique_column_attnum]))69    altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())70    test_utils.assert_primary_key_and_unique_present(altered_table)71    unique_constraint = test_utils.get_first_unique_constraint(altered_table)72    assert unique_constraint.name == constraint_name73    assert len(list(unique_constraint.columns)) == 174    assert list(unique_constraint.columns)[0].name == unique_column_name75def test_create_unique_constraint_with_duplicate_name(engine_with_schema):76    engine, schema = engine_with_schema77    table_name = "orders_4"78    unique_column_names = ['product_name', 'customer_name']79    constraint_name = 'unique_product_name'80    table = Table(81        table_name,82        MetaData(bind=engine, schema=schema),83        Column('order_id', Integer, primary_key=True),84        Column(unique_column_names[0], String),85        Column(unique_column_names[1], String),86    )87    table.create()88    table_oid = get_oid_from_table(table_name, schema, engine)89    unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_names[0], engine, metadata=get_empty_metadata())90    create_constraint(schema, engine, UniqueConstraint(constraint_name, table_oid, [unique_column_attnum]))91    altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())92    test_utils.assert_primary_key_and_unique_present(altered_table)93    with pytest.raises(ProgrammingError):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
