Best Python code snippet using dbt-osmosis_python
parse_sql_tests.py
Source:parse_sql_tests.py  
1import sys2from os import path3sys.path.append(path.dirname(path.dirname(path.abspath(__file__))) + '/code')4from unittest import TestCase, main5from parse_sql import WithStatementParser6class ParseSqlTests(TestCase):7    def test_helper_cleaning_methods(self):8        sql_parser = WithStatementParser('./test_data/example_1.sql')9        input_text = ' extra\n space  everywhere '10        output_text = 'extra space everywhere'11        self.assertEqual(sql_parser.remove_excess_whitespace(input_text), output_text)12        input_text = '-- comment\nselect * from table;'13        output_text = '\nselect * from table;'14        self.assertEqual(sql_parser.remove_comments(input_text), output_text)15        input_text = 'select * -- inline comment\nfrom table;'16        output_text = 'select * \nfrom table;'17        self.assertEqual(sql_parser.remove_comments(input_text), output_text)18        input_text = 'select * from table1, table2;'19        output_text = 'select * from table1 cross join table2;'20        self.assertEqual(sql_parser.cross_joins(input_text), output_text)21        input_text = 'select * from table1 t1, table2 t2;'22        output_text = 'select * from table1 t1 cross join table2 t2;'23        self.assertEqual(sql_parser.cross_joins(input_text), output_text)24        input_text = 'select * from table1 as t1, table2 as t2;'25        output_text = 'select * from table1 as t1 cross join table2 as t2;'26        self.assertEqual(sql_parser.cross_joins(input_text), output_text)27        input_text = 'select * from table1, table2, table3;'28        output_text = 'select * from table1 cross join table2 cross join table3;'29        self.assertEqual(sql_parser.cross_joins(input_text), output_text)30        output_text = 'my_table_1 as (\nselect *\nfrom table_1\n)\nselect *\nfrom my_table_1\n;'31        self.assertEqual(sql_parser.clean_original_query(), output_text)32    def test_parenthesis_tracking(self):33        sql_parser = WithStatementParser('./test_data/example_1.sql')34        self.assertFalse(sql_parser.end_with_statement)35        sql_parser.parenthesis_tracking('(')36        self.assertEqual(sql_parser.num_open_parens, 1)37        self.assertFalse(sql_parser.end_with_statement)38        sql_parser.parenthesis_tracking(')')39        self.assertEqual(sql_parser.num_close_parens, 1)40        self.assertTrue(sql_parser.end_with_statement)41    def test_extract_with_statements(self):42        sql_parser = WithStatementParser('./test_data/example_2.sql')43        sql_parser.extract_with_statements()44        self.assertEqual(sql_parser.overall_name, 'my_table_1_my_table_2')45        self.assertEqual(sql_parser.build_order, ['my_table_1', 'my_table_2', 'my_table_1_my_table_2'])46        self.assertEqual(sorted(sql_parser.with_statements.keys()), ['my_table_1', 'my_table_1_my_table_2', 'my_table_2'])47        self.assertEqual(sql_parser.with_statements['my_table_1'], 'select *\nfrom table_1')48        self.assertEqual(sql_parser.with_statements['my_table_2'], 'select *\nfrom table_2')49        self.assertEqual(sql_parser.with_statements['my_table_1_my_table_2'], '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;')50    def test_dependency(self):51        sql_parser = WithStatementParser('./test_data/example_3.sql')52        sql_parser.with_statements = {53            'my_table_1': 'select *\nfrom table_1',54            'my_table_2': 'select *\nfrom my_table_1 as m',55            'my_table_1_my_table_2': '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;'56            }57        sql_parser.dependency_alias()58        self.assertEqual(sql_parser.dependencies['my_table_1'], [])59        self.assertEqual(sql_parser.dependencies['my_table_2'], ['my_table_1'])60        self.assertEqual(sql_parser.dependencies['my_table_1_my_table_2'], ['my_table_1', 'my_table_2'])61        # no cycle in dependence62        sql_parser.dependencies = {63            'my_table_1': [],64            'my_table_2': ['my_table_1']65            }66        self.assertFalse(sql_parser.dependency_graph_contains_cycles())67        # 2 node cycle in dependence68        sql_parser.dependencies = {69            'my_table_1': ['my_table_2'],70            'my_table_2': ['my_table_1']71            }72        self.assertTrue(sql_parser.dependency_graph_contains_cycles())73        # 3 node cycle in dependence74        sql_parser.dependencies = {75            'my_table_1': ['my_table_2'],76            'my_table_2': ['my_table_3'],77            'my_table_3': ['my_table_1']78            }79        self.assertTrue(sql_parser.dependency_graph_contains_cycles())80    def test_alias(self):81        sql_parser = WithStatementParser('./test_data/example_3.sql')82        sql_parser.with_statements = {83            'my_table_1': 'select *\nfrom table_1',84            'my_table_2': 'select *\nfrom my_table_1 as m',85            'my_table_1_my_table_2': '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;'86            }87        sql_parser.dependency_alias()88        self.assertEqual(sorted(sql_parser.all_aliases), ['m', 't1', 't2'])89        self.assertEqual(sql_parser.aliases['my_table_1'], {})90        self.assertEqual(sql_parser.aliases['my_table_2'], {'my_table_1': 'm'})91        self.assertEqual(sql_parser.aliases['my_table_1_my_table_2']['my_table_1'], 't1')92        self.assertEqual(sql_parser.aliases['my_table_1_my_table_2']['my_table_2'], 't2')93        # return empty string when alias exists94        sql_parser.aliases = {'my_table_2': {'my_table_1', 't1'}}95        self.assertEqual(sql_parser.get_alias('my_table_2', 'my_table_1'), '')96        # return next available alias when alias doesn't exist97        sql_parser.alias_index = 198        sql_parser.all_aliases = ['t1']99        self.assertEqual(sql_parser.get_alias('my_table_2', 'my_table_3'), ' t2')100        # return next available alias101        sql_parser.alias_index = 1102        sql_parser.all_aliases = ['t1', 't2']103        self.assertEqual(sql_parser.next_alias(), ' t3')104    def test_create_nested_with_statements(self):105        sql_parser = WithStatementParser('./test_data/example_3.sql')106        sql_parser.extract_with_statements()107        sql_parser.create_nested_with_statements()108        nested_query = '''109select *110from (111  select *112  from table_1113  ) t1114join (115  select *116  from table_2 t2117  join (118    select *119    from table_1120    ) t1121  on t1.id = t2.id122  ) t2123on t1.id = t2.id124;125'''.strip()126        self.assertEqual(sql_parser.get_nested_query(), nested_query)127if __name__ == '__main__':...app.py
Source:app.py  
1import os, json, flask2from flask_restful import reqparse3from flask import Flask, request, render_template4from flask_restful import Resource, Api5import pandas as pd6from fetch_data import fetch_data, fetch_data_custom7from dataviz.data_viz import generate_charts8from config import config9app = Flask(__name__)10api = Api(app)11sql_parser = reqparse.RequestParser()12sql_parser.add_argument('limit', type=int, help="Limit the returned rows of SQL Query")13sql_parser.add_argument('filter', type=str, help="filter the returned rows of SQL Query")14sql_parser.add_argument('per_page', type=int, help="Per Page limit")15# custom_sql_args = reqparse.RequestParser()16# custom_sql_args.add_argument("sql", type=str, help="Send SQL Query", required=True)17def get_paginated_list(tablename, sql_filter, sql_limit, url, start, per_page=10):18    start = int(start)19    per_page = int(per_page)20    results = fetch_data(tablename, sql_filter + sql_limit)21    count = len(results)22    if count < start or per_page < 0:23        flask.abort(404)24    obj = {}25    obj['start'] = start26    obj['per_page'] = per_page27    obj['count'] = count28    if start == 1:29        obj['previous'] = ''30    else:31        start_copy = max(1, start - per_page)32        per_page_copy = start - 133        obj['previous'] = url + '?start=%d&per_page=%d' % (start_copy, per_page_copy)34    if start + per_page > count:35        obj['next'] = ''36    else:37        start_copy = start + per_page38        obj['next'] = url + '?start=%d&per_page=%d' % (start_copy, per_page)39    obj['results'] = json.loads(results[(start - 1):(start - 1 + per_page)][:].to_json(orient="records"))40    return obj41@app.route('/')42@app.route("/home")43def home_page():44    return render_template('home.html')45@app.route("/list_api")46def list_api_page():47    return render_template('list_api.html',48                           api_list_items=[fname.split('.')[0] for fname in os.listdir(config.project_sql) if49                                           os.path.isdir(fname) == False])50@app.route("/fetch_apps/<string:tablename>", methods=['GET'])51def fetchdata_apps(tablename):52    # tablename='accnt'53    sql_arg = sql_parser.parse_args()54    if sql_arg['filter'] is not None:55        sql_filter = ' where ' + sql_arg['filter'] + ' '56    else:57        sql_filter = ' where 1 = 1'58    if sql_arg['limit'] and sql_arg['limit'] > 0:59        sql_limit = ' limit ' + str(sql_arg['limit'])60    else:61        sql_limit = ''62        sql_arg['per_page'] = 1063    if sql_arg['per_page'] is None:64        sql_arg['per_page'] = 1065    #df = fetch_data(tablename, sql_filter, sql_limit)66    #return flask.jsonify(json.loads(df.to_json(orient='records')))67    return flask.jsonify(get_paginated_list(68        tablename, sql_filter, sql_limit,69        request.url_root + 'fetch_apps/' + tablename,70        start=request.args.get('start', 1),71        per_page=request.args.get('per_page', sql_arg['per_page'])))72class FetchApp(Resource):73    def __init__(self):74        pass75    def get(self):76        # directory = config.project_sql77        api_list = [fname.split('.')[0] for fname in os.listdir(config.project_sql) if os.path.isdir(fname) == False]78        return flask.jsonify(api_list)79class GetViz(Resource):80    def __init__(self):81        pass82    def get(self):83        # directory = config.project_sql84        sql_file_list = [fname for fname in os.listdir(config.project_sql) if85                         os.path.isdir(fname) == False and fname.endswith('.sql')]86        viz_file = generate_charts(sql_file_list)87        # print(viz_file)88        if '.' in viz_file:89            if viz_file.split('.')[1] == 'pdf' or viz_file.split('.')[1] == 'zip':90                return flask.send_file(viz_file)91            else:92                return viz_file93        else:94            return viz_file95class CustomSql(Resource):96    def __init__(self):97        pass98    # def get(self, custsql):99    #    return custsql100    def post(self):101        obj={}102        data = request.get_json()103        sql_arg = sql_parser.parse_args()104        sql_filter = ' where 1 = 1'105        if sql_arg['limit'] and sql_arg['limit'] > 0:106            sql_limit = ' limit ' + str(sql_arg['limit'])107        else:108            sql_limit = ''109            sql_filter = ' where 1 = 1'110        df = fetch_data_custom(data, sql_filter + sql_limit)111        obj['results'] = json.loads(df.to_json(orient='records'))112        return flask.jsonify(obj)113api.add_resource(FetchApp, '/fetch_apps')114api.add_resource(CustomSql, '/customsql')115api.add_resource(GetViz, '/getviz')116if __name__ == '__main__':117    port = 5000...check_sql.py
Source:check_sql.py  
1# RE library to match correct syntactic structures of SQL statements2import re3class SQL_Parser:4    def __init__(self, s):5        self.stmt = s6    def parse_select(self,s):7        regex = 'select\s+(distinct|all)?\s*(\*|((\w+)\s*,\s*)*\s*(\w+))\s+from\s+(\w+)\s*(where\s+(((\w+)\s+between\s+(\w+)\s+and\s+(\w+))|((\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+)(\s*(and|or)\s+(\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+))*)))?\s*(having\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*(<|>|<=|>=|=|<>)\s*([a-zA-Z0-9][a-zA-Z0-9]*) )?\s*(order\s+by\s+((\w+)\s*(asc|desc)?\s*,\s*)*\s*(\w+)\s+(asc|desc)?)?\s*(limit\s+(\d+))?\s*;?'8        return bool(re.match(regex, s))9    def parse_create(self,s):10        regex = "create table ([a-zA-Z_][a-zA-Z0-9_]*)\s*\(\s*(([a-zA-Z_][a-zA-Z0-9_]*)\s+(int|varchar|text)\s*(,)\s*)*(([a-zA-Z_][a-zA-Z0-9_]*)\s+(int|varchar|text))\)\s*;"11        return bool(re.match(regex, s))12    def parse_drop(self,s):13        regex = "drop table (([a-zA-Z_][a-zA-Z0-9_]*),)*([a-zA-Z_][a-zA-Z0-9_]*)(;)?"14        return bool(re.match(regex, s))15    def parse_insert(self,s):16        regex = 'insert\s+into\s+(\w+)\s*(\(\s*((([a-zA-Z0-9][a-zA-Z0-9_]*)\s*,\s*)*\s*([a-zA-Z0-9][a-zA-Z0-9_]*))\s*\))?\s+values\s*(\(\s*((((".*")|([a-zA-Z0-9][a-zA-Z0-9]*))\s*,\s*)*\s*([a-zA-Z0-9][a-zA-Z0-9]*))\s*\));?'17        return bool(re.match(regex, s))18    def parse_update(self,s):19        regex = 'update\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+set\s+(([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(".*"|[0-9][0-9]*)\s*,\s*)*\s*(([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(".*"|[0-9][0-9]*))\s*(where\s+(([a-zA-Z_][a-zA-Z0-9_]*)\s*((between\s+([0-9][0-9]*)\s+and\s+([0-9][0-9]*))|(is\s+(not)?\s+ null)|(<|>|<=|>=|=|<>)\s*(".*"|[0-9][0-9]*))))\s*;?'20        return bool(re.match(regex, s))21    def parse_delete(self,s):22        regex = 'delete\s+from\s+(\w+)\s*(where\s+(((\w+)\s+between\s+(\w+)\s+and\s+(\w+))|((\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+)(\s*(and|or)\s+(\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+))*)))?\s*(order\s+by\s+((\w+)\s*(asc|desc)?\s*,\s*)*\s*((\w+)\s*(asc|desc))?)?\s*(limit\s+(\d+));?'23        return bool(re.match(regex, s))24    def check_semicolon(self,s):25        if (s[len(s) - 1] != ';'):26            return False27        return True28    def tokenize(self, s):29        return s.split(" ")30    def check_keywords(self,tokens):31        keywords = ["select", "create", "drop", "insert", "update", "delete"]32        if (tokens[0] not in keywords):33            return False34        return True35    def clean(self,s):36        stmt = s.strip("\n")37        # coverting the sql statement to lower case for ease of checking38        stmt = stmt.lower()39        sql_keywords = ["select", "create", "drop", "insert", "update", "delete"]40        # converting all multiple white spaces into single space in the statement41        stmt = ' '.join(stmt.split())42        return stmt43    def parse_query(self):44        s = self.clean(self.stmt)45        # tokenizing the sql query46        tokens = self.tokenize(s)47        if (self.check_semicolon(s) == False or self.check_keywords(tokens) == False):48            return False49        query_type = tokens[0]50        if (query_type == "select"):51            return(self.parse_select(s))52        if (query_type == "update"):53            return(self.parse_update(s))54        if (query_type == "insert"):55            return(self.parse_insert(s))56        if (query_type == "delete"):57            return(self.parse_delete(s))58        if (query_type == "create"):59            return(self.parse_create(s))60        if (query_type == "drop"):61            return(self.parse_drop(s))62def main():63    print("\n************* Welcome to SQL Synctactic Parser made by Yukti Khurana ****************\n\n")64    # reading the sql statement65    stmt = "SELECT colname FROM table1, table2 WHERE condition_list ORDER BY colname ;"66    sql_parser = SQL_Parser(stmt)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
