How to use unique_column_names method in pandera

Best Python code snippet using pandera_python

Dynamic SQL Creation.py

Source:Dynamic SQL Creation.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3 purpose: To get the results from an API call and generate a SQL Command to store the data4 author: Alberto Rodriguez5 date: 2019+06+016"""7import os8import pyodbc9import requests10import pandas as pd11from datetime import datetime12# Global Variables13startTime = datetime.now()14date = datetime.now().strftime('%Y+%m+%d')15os.chdir(os.path.dirname((os.path.abspath(__file__)))) # Sets the working directory to the path of the script file16fileName = 'testdb'17def print_msg(msg):18 # TODO: Save the log message into a text file; Add the time the script is run into the name19 print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} {msg}')20def print_exception(exception):21 print_msg(exception)22 exit(1)23def get_api():24 # url = 'https://jsonplaceholder.typicode.com/photos'25 # url = 'https://jsonplaceholder.typicode.com/comments'26 url = 'https://jsonplaceholder.typicode.com/posts'27 hdr = {28 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '29 'Chrome/74.0.3729.169 Safari/537.36',30 'Content-Type': 'application/json',31 }32 params = {33 'userId': '1'34 }35 with requests.Session() as s:36 print_msg(f'Attempting to access {url}')37 # Will try to make the API GET request and store the JSON response38 try:39 r = s.get(url, headers=hdr, params=params)40 print_msg('Success')41 api_json_response = r.json() # If the response is nested, handle it here using api_json_response['name']42 except Exception as e:43 print_exception(e)44 formatted_response = [] # Empty list to store each row of data45 # Loop will go through the response and parse it into a list then, save it as a pandas data frame46 for counter, value in enumerate(api_json_response):47 formatted_response.append(value)48 try:49 dFrame = pd.DataFrame(formatted_response) # Convert the list to a pandas data frame50 dFrame.to_csv(f'{date}_{fileName}.csv', index=False) # Saves the file locally51 print_msg('Saved file locally.')52 except Exception as e:53 print_exception(e)54 # Pass the data to be saved to the SQL Server Sandbox including the file name that will be used as the table55 load_data_to_db(formatted_response, 'test_db')56def load_data_to_db(data, tableName):57 # Necessary parameters to make the connection58 con = pyodbc.connect(59 Trusted_Connection='yes',60 driver='{SQL Server}',61 server=r'DESKTOP-JQ3I8KK\SQLEXPRESS',62 autocommit=True)63 cursor = con.cursor() # Cursor object to execute SQL Commands64 xdbName = 'Database1'65 xtblName = f'{xdbName}.dbo.{tableName}'66 def create_table():67 # Next couple of lines will manipulate the column names to be MS SQL ready, see format below:68 # [col name] VarChar(col length)69 unique_column_names = []70 for counter, value in enumerate(data):71 for key in value.keys():72 col_name = f'[{key}] VARCHAR(max)'73 # col_name = col_name.lower()74 if col_name not in unique_column_names: # Avoids duplication of the column names75 unique_column_names.append(col_name) # Adds the unique value, formatted column to the list76 print_msg(f'Attempting to create {xtblName}.')77 # Tries to put together the SQL command to create the table dynamically78 try:79 sql = f'DROP TABLE IF EXISTS {xtblName}; ' \80 f'CREATE TABLE {xtblName}({", ".join(col for col in unique_column_names)})'81 cursor.execute(sql)82 print_msg(f'{sql}.')83 print_msg(f'Successfully created {xtblName}.')84 except Exception as e:85 print_exception(e)86 def insert_query():87 # Tries to create and execute the insert statement for MSSQL88 try:89 for counter, value in enumerate(data):90 # Created this step to deal with special characters that may cause the SQL to fail91 tempList = [] # Used to store the manipulated values92 for words in value.values():93 words = str(words).replace("'", "\'\'") # Replaces single ' with double; escape char for MSSQL94 tempList.append(words) # Keeps a running list of manipulated words95 # will store the columns and values needed to dynamically make the SQL insert statement96 columns = (', '.join([key for key, information in value.items()]))97 values = (", ".join([f'\'{str(information)}\'' for information in tempList]))98 sql = f'INSERT INTO {xtblName} ({columns}) VALUES ({values})'99 cursor.execute(sql)100 print_msg(f'Successfully updated {xtblName} {sql}.')101 except Exception as e:102 print_exception(e)103 print_msg(f'Inserted {counter+1} rows into {xtblName}\n')104 # Tries to create the table and load the data into the SQL table105 try:106 create_table()107 insert_query()108 except Exception as e:109 print_exception(e)110def elapsed_time():111 seconds = datetime.now() - startTime112 seconds = int(seconds.total_seconds())113 days, seconds = divmod(seconds, 86400)114 hours, seconds = divmod(seconds, 3600)115 minutes, seconds = divmod(seconds, 60)116 if days > 0:117 print_msg('Total runtime for this script was %dd %dh %dm %ds' % (days, hours, minutes, seconds))118 elif hours > 0:119 print_msg('Total runtime for this script was %dh %dm %ds' % (hours, minutes, seconds))120 elif minutes > 0:121 print_msg('Total runtime for this script was %dm %ds' % (minutes, seconds))122 else:123 print_msg('Total runtime for this script was %ds' % (seconds,))124# **********************************************************************************************************************125if __name__ == '__main__':126 get_api()...

Full Screen

Full Screen

vcf2tsv.py

Source:vcf2tsv.py Github

copy

Full Screen

1import io2import argparse3import pandas as pd4# Class which contains utility functions to convert VCF to TSV5class VcfToTsv:6 """7 Read a vcf file and convert it to a tsv file.8 Uses:9 `Pandas`10 """11 def __init__(self, input_vcf: str, output_tsv: str):12 self.input_vcf = input_vcf13 self.output_tsv = output_tsv14 # Read VCF15 lines: list = self.read_vcf()16 # Write output to TSV file17 self.write_output(lines)18 # Read data in the input VCF file19 def read_vcf(self) -> list:20 try:21 with open(self.input_vcf, "rt") as f:22 lines: list = [l for l in f if not l.startswith("##")]23 return lines24 except Exception:25 print("Error Reading VCF File")26 def write_output(self, lines: list) -> None:27 data = pd.read_csv(28 io.StringIO("".join(lines)),29 dtype={30 "#CHROM": str,31 "POS": int,32 "ID": str,33 "REF": str,34 "ALT": str,35 "QUAL": str,36 "FILTER": str,37 "INFO": str,38 },39 sep="\t",40 ).rename(columns={"#CHROM": "CHROM"})41 main_vcf = data.filter(42 ["CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER"], axis=143 )44 info_data_modified = self.get_info_data(data)45 format_data_modified = self.get_format_data(data)46 modified_tags = main_vcf.join(info_data_modified)47 modified_tags = modified_tags.join(format_data_modified)48 modified_tags.to_csv(self.output_tsv, sep="\t", encoding="utf-8", index=False)49 # Parse Info Data50 def get_info_data(self, data: pd) -> pd:51 # Get original info data52 info = data["INFO"].str.split(";", expand=True)53 # Get unique tags in Info field and create a dataframe using these tags as columns54 tags: list = []55 for i in info.columns:56 tags.extend(info[i].str.split("=", expand=True)[0].tolist())57 not_none_tags = set(filter(None.__ne__, tags))58 info_data = pd.DataFrame(columns=not_none_tags)59 # Parse info data from the VCF and append data to main dataframe - info_data60 for index, row in info.iterrows():61 # get data for each INFO row and convert that to a dataframe62 entry = row.str.split("=", expand=True).transpose()63 new_header = entry.iloc[0]64 entry = entry[1:]65 entry.columns = new_header66 # Drop empty columns67 entry = entry.loc[:, entry.columns.notnull()].fillna("Yes")68 # Concatenate with the main info dataframe69 info_data = pd.concat([info_data, entry])70 # Reset index and replace NaN with empty string71 info_data = info_data.reset_index(drop=True)72 info_data = info_data.fillna("")73 return info_data74 # Parse FORMAT Data75 def get_format_data(self, data: pd) -> pd:76 # Get the VCF #CHROM line77 header = list(data.head(0))78 # Determine the sample columns which appear after the FORMAT column in the VCF file79 samples = self.get_sample_names(header)80 # Get the tags in format column81 format_tags = data["FORMAT"].str.split(":", expand=True)82 # Construct an overall dataframe from the unique tags in format column83 unique_column_names = pd.unique(format_tags.stack())84 format_tags_unique = pd.DataFrame(columns=unique_column_names)85 for sample in samples:86 format_tags_sample = pd.DataFrame(columns=unique_column_names)87 # Append the sample name to FORMAT tags which is the header in the tsv file88 format_names = [sub + "-" + sample for sub in unique_column_names]89 # Fetch the format data for the sample90 sample_data = data[sample].str.split(":", expand=True)91 for i in sample_data.index:92 # Construct a dataframe for each row in the sample data93 head = format_tags.loc[[i]].iloc[0]94 value = sample_data.loc[[i]]95 value.columns = head96 # Skip columns which have 'None' and reset index97 value = value.loc[:, value.columns.notnull()]98 value = value.reset_index(drop=True)99 # Concatenate the sample data to overall sample dataframe100 format_tags_sample = pd.concat([format_tags_sample, value]).fillna("")101 # Concatenate the complete sample data to the102 # main dataframe containing unique tags and reset index103 format_tags_sample.columns = format_names104 format_tags_sample = format_tags_sample.reset_index(drop=True)105 if format_tags_unique.empty:106 format_tags_unique = format_tags_sample107 else:108 format_tags_unique = format_tags_unique.join(format_tags_sample)109 return format_tags_unique110 # Get Sample Names from VCF file111 def get_sample_names(self, header: list) -> list:112 samples: list = []113 for i in range((header.index("FORMAT") + 1), (len(header))):114 samples.append(header[i])115 return samples116# Main function117if __name__ == "__main__":118 parser = argparse.ArgumentParser(description="Convert vcf to tsv file")119 parser.add_argument(120 "-i", "--input_vcf", help="Input VCF file", required=True,121 )122 parser.add_argument(123 "-o", "--output_tsv", help="Output TSV file", required=True,124 )125 args = parser.parse_args()...

Full Screen

Full Screen

test_create.py

Source:test_create.py Github

copy

Full Screen

1import pytest2from sqlalchemy import String, Integer, Column, Table, MetaData3from sqlalchemy.exc import ProgrammingError4from db.columns.operations.select import get_column_attnum_from_name, get_columns_attnum_from_names5from db.constraints.base import UniqueConstraint6from db.constraints.operations.create import create_constraint, create_unique_constraint7from db.tables.operations.select import get_oid_from_table, reflect_table_from_oid8from db.tests.constraints import utils as test_utils9from db.metadata import get_empty_metadata10def test_create_single_column_unique_constraint(engine_with_schema):11 engine, schema = engine_with_schema12 table_name = "orders_1"13 unique_column_name = 'product_name'14 table = Table(15 table_name,16 MetaData(bind=engine, schema=schema),17 Column('order_id', Integer, primary_key=True),18 Column(unique_column_name, String),19 )20 table.create()21 test_utils.assert_only_primary_key_present(table)22 table_oid = get_oid_from_table(table_name, schema, engine)23 unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_name, engine, metadata=get_empty_metadata())24 create_constraint(schema, engine, UniqueConstraint(None, table_oid, [unique_column_attnum]))25 altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())26 test_utils.assert_primary_key_and_unique_present(altered_table)27 unique_constraint = test_utils.get_first_unique_constraint(altered_table)28 assert unique_constraint.name == f'{table_name}_{unique_column_name}_key'29 assert len(list(unique_constraint.columns)) == 130 assert list(unique_constraint.columns)[0].name == unique_column_name31def test_create_multiple_column_unique_constraint(engine_with_schema):32 engine, schema = engine_with_schema33 table_name = "orders_2"34 unique_column_names = ['product_name', 'customer_name']35 table = Table(36 table_name,37 MetaData(bind=engine, schema=schema),38 Column('order_id', Integer, primary_key=True),39 Column(unique_column_names[0], String),40 Column(unique_column_names[1], String),41 )42 table.create()43 test_utils.assert_only_primary_key_present(table)44 table_oid = get_oid_from_table(table_name, schema, engine)45 unique_column_attnums = get_columns_attnum_from_names(table_oid, unique_column_names, engine, metadata=get_empty_metadata())46 create_constraint(schema, engine, UniqueConstraint(None, table_oid, unique_column_attnums))47 altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())48 test_utils.assert_primary_key_and_unique_present(altered_table)49 unique_constraint = test_utils.get_first_unique_constraint(altered_table)50 unique_column_name_1 = unique_column_names[0]51 assert unique_constraint.name == f'{table_name}_{unique_column_name_1}_key'52 assert len(list(unique_constraint.columns)) == 253 assert set([column.name for column in unique_constraint.columns]) == set(unique_column_names)54def test_create_unique_constraint_with_custom_name(engine_with_schema):55 engine, schema = engine_with_schema56 table_name = "orders_4"57 unique_column_name = 'product_name'58 constraint_name = 'unique_product_name'59 table = Table(60 table_name,61 MetaData(bind=engine, schema=schema),62 Column('order_id', Integer, primary_key=True),63 Column(unique_column_name, String),64 )65 table.create()66 table_oid = get_oid_from_table(table_name, schema, engine)67 unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_name, engine, metadata=get_empty_metadata())68 create_constraint(schema, engine, UniqueConstraint(constraint_name, table_oid, [unique_column_attnum]))69 altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())70 test_utils.assert_primary_key_and_unique_present(altered_table)71 unique_constraint = test_utils.get_first_unique_constraint(altered_table)72 assert unique_constraint.name == constraint_name73 assert len(list(unique_constraint.columns)) == 174 assert list(unique_constraint.columns)[0].name == unique_column_name75def test_create_unique_constraint_with_duplicate_name(engine_with_schema):76 engine, schema = engine_with_schema77 table_name = "orders_4"78 unique_column_names = ['product_name', 'customer_name']79 constraint_name = 'unique_product_name'80 table = Table(81 table_name,82 MetaData(bind=engine, schema=schema),83 Column('order_id', Integer, primary_key=True),84 Column(unique_column_names[0], String),85 Column(unique_column_names[1], String),86 )87 table.create()88 table_oid = get_oid_from_table(table_name, schema, engine)89 unique_column_attnum = get_column_attnum_from_name(table_oid, unique_column_names[0], engine, metadata=get_empty_metadata())90 create_constraint(schema, engine, UniqueConstraint(constraint_name, table_oid, [unique_column_attnum]))91 altered_table = reflect_table_from_oid(table_oid, engine, metadata=get_empty_metadata())92 test_utils.assert_primary_key_and_unique_present(altered_table)93 with pytest.raises(ProgrammingError):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pandera automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful