Best Python code snippet using autotest_python
test_sql_common.py
Source:test_sql_common.py  
...51                for key in payload.keys():52                    assert parameters[key] == payload[key]53        finally:54            if hasattr(connection, "_close_connection"):55                connection._close_connection()56    def test_createtable(self):57        connection = self.baseClass(None, self.app_config)58        try:59            # find all the columns types by column name we expect60            exected_col_types = {}61            for field in all_fields:62                field_name = field['name']63                field_type = field['input_type']64                col_type = connection.dialect.get_column_type(field_type)65                exected_col_types[field_name.lower()] = col_type.lower()66            connection._create_or_update_table(self.table_name, all_fields)67            # confirm that the table was created68            cursor = connection._start_transaction()69            try:70                get_cols_result = connection._execute_sql(71                    cursor,72                    self.inspect_cols_query)73                table_cols = {}74                for row in get_cols_result:75                    col_name = row[0]76                    col_type = row[1]77                    table_cols[col_name.lower()] = col_type.lower()78                for col in exected_col_types.keys():79                    if col in ('id', 'inc_id'):80                        assert table_cols.get(col, '').startswith('int') or table_cols.get(col, '').startswith("number")81                    else:82                        db_col = table_cols.get(col, '')83                        col_type = db_col.split("(")[0]84                        col_type_simple = col_type.split(" ")[0]85                        assert exected_col_types[col].startswith(col_type_simple)86            finally:87                cursor.close()88        finally:89            if hasattr(connection, "_close_connection"):90                connection._close_connection()91    def test_insert_row(self, result_payload):92        global flat_payload93        connection = self.baseClass(None, self.app_config)94        try:95            payload = flat_payload.copy()96            # convert data for the fields97            for field in all_fields:98                item_name = field['name']99                item_type = field['input_type']100                item_value = flat_payload[item_name]101                converted_value = TypeInfo.translate_value(None, field, item_value)102                payload[item_name] = converted_value103            # add data to table104            cursor = connection._start_transaction()105            if self.setup_stmt:106                connection._execute_sql(107                    cursor,108                    self.setup_stmt)109            cmd = connection.dialect.get_upsert(TABLE_NAME, self.all_field_names, self.all_field_types)110            params = connection.dialect.get_parameters(self.all_field_names, payload)111            insert_result = connection._execute_sql(112                cursor,113                cmd,114                params115            )116            connection._commit_transaction(cursor)117            if hasattr(insert_result, 'nextset'):118                while insert_result.nextset():119                    row_count = insert_result.rowcount120                    assert row_count == 1121            # get the row and confirm the values122            select_stmt = "select * from {} where id = {}".format(TABLE_NAME, payload["id"])123            select_result = connection._execute_sql(124                cursor,125                select_stmt)126            rows = cursor.fetchall()127            for row in rows:128                for ndx in range(len(select_result.description)):129                    col_name = select_result.description[ndx][0].lower()130                    assert row[ndx] == result_payload[col_name]131        finally:132            cursor.close()133            if hasattr(connection, "_close_connection"):134                connection._close_connection()135    def test_update_row(self, MAX_TEXT_SIZE, result_payload):136        global flat_payload137        connection = self.baseClass(None, self.app_config)138        try:139            payload = flat_payload.copy()140            # convert data for the fields141            for field in all_fields:142                item_name = field['name']143                item_type = field['input_type']144                item_value = flat_payload[item_name]145                converted_value = TypeInfo.translate_value(None, field, item_value)146                payload[item_name] = converted_value147            payload['test_text'] = u"a" * MAX_TEXT_SIZE148            payload_result = result_payload.copy()149            payload_result['test_text'] = payload['test_text']150            # add data to table151            cursor = connection._start_transaction()152            if self.setup_stmt:153                connection._execute_sql(154                    cursor,155                    self.setup_stmt)156            update_result = connection._execute_sql(157                cursor,158                connection.dialect.get_upsert(TABLE_NAME, self.all_field_names, self.all_field_types),159                connection.dialect.get_parameters(self.all_field_names, payload))160            connection._commit_transaction(cursor)161            #assert update_result.rowcount == 1162            # get the row and confirm the values163            select_stmt = "select * from {} where id = {}".format(TABLE_NAME, payload["id"])164            select_result = connection._execute_sql(165                cursor,166                select_stmt)167            rows = cursor.fetchall()168            for row in rows:169                for ndx in range(len(select_result.description)):170                    col_name = select_result.description[ndx][0].lower()171                    assert row[ndx] == payload_result[col_name]172        finally:173            cursor.close()174            if hasattr(connection, "_close_connection"):175                connection._close_connection()176    def test_delete_row(self):177        global flat_payload178        connection = self.baseClass(None, self.app_config)179        cursor = connection._start_transaction()180        try:181            # delete the row182            delete_result = connection._execute_sql(183                cursor,184                connection.dialect.get_delete(TABLE_NAME),185                [flat_payload['id']])186            connection._commit_transaction(cursor)187            assert delete_result.rowcount == 1188        finally:189            cursor.close()190            if hasattr(connection, "_close_connection"):191                connection._close_connection()192    def test_altertable(self, col_type='text'):193        global flat_payload194        ALTER_COL = "alter_col"195        connection = self.baseClass(None, self.app_config)196        cursor = connection._start_transaction()197        try:198            alter_result = connection._execute_sql(199                cursor,200                connection.dialect.get_add_column_to_table(TABLE_NAME, ALTER_COL, col_type))201            connection._commit_transaction(cursor)202            get_cols_result = connection._execute_sql(203                cursor,204                self.inspect_cols_query)205            rows = cursor.fetchall()206            assert len(rows) == 8    # number of columns207        finally:208            cursor.close()209            if hasattr(connection, "_close_connection"):210                connection._close_connection()211    def test_droptable(self):212        connection = self.baseClass(None, self.app_config)213        cursor = connection._start_transaction()214        try:215            alter_result = connection._execute_sql(216                cursor,217                "Drop table {}".format(TABLE_NAME))218            connection._commit_transaction(cursor)219        finally:220            cursor.close()221            if hasattr(connection, "_close_connection"):...database_manager.py
Source:database_manager.py  
...9        self.verify_table_exists('users')10        self.connection = None11    def _open_connection(self):12        self.connection = sqlite3.connect(self.config['db']['filename'])13    def _close_connection(self):14        self.connection.close()15    def create_users_table(self):16        self._open_connection()17        cur = self.connection.cursor()18        cur.execute(''' CREATE TABLE `users` (`user_id` INTEGER PRIMARY KEY, `positive_relationship` INTEGER, `negative_relationship` INTEGER) ''')19        self.connection.commit()20        self._close_connection()21    def insert_user(self, user_id, positive_rep: int = 0, negative_rep: int = 0):22        self._open_connection()23        cur = self.connection.cursor()24        cur.execute(''' INSERT INTO `users` (`user_id`, `positive_relationship`, `negative_relationship`) VALUES (?, ?, ?) ''', (user_id, positive_rep, negative_rep))25        print('Added entry {0}'.format(user_id))26        self.connection.commit()27        self._close_connection()28    def update_user(self, user_id: int, positive_rep: int, negative_rep: int):29        self._open_connection()30        cur = self.connection.cursor()31        cur.execute(''' UPDATE `users` SET `positive_relationship` = ?, `negative_relationship` = ? WHERE `user_id` = ? ''', (positive_rep, negative_rep, user_id))32        print('Updated entry {0}'.format(user_id))33        self.connection.commit()34        self._close_connection()35    def verify_user_exists(self, user_id: int):36        self._open_connection()37        cur = self.connection.cursor()38        cur.execute(''' SELECT * FROM `users` WHERE `user_id` = ? ''', (user_id,))39        result = cur.fetchone()40        self._close_connection()41        return result42    def verify_table_exists(self, table_name: str):43        self._open_connection()44        cur = self.connection.cursor()45        cur.execute(''' SELECT count(`name`) as 'exists' FROM `sqlite_master` WHERE `type` = 'table' AND `name` = 'users' ''')46        if cur.fetchone()[0] == 1:47            return48        self.connection.commit()49        self._close_connection()...sqlite_database.py
Source:sqlite_database.py  
...13        try:14            cursor.close()15        except sqlite3.Error as exception:16            pass17    def _close_connection(self, connection):18        try:19            connection.close()20        except sqlite3.Error as exception:21            pass22    def execute_update(self, statement, parameters):23        try:24            connection = self._create_connection()25            cursor = connection.cursor()26            cursor.execute(statement, parameters)27            connection.commit()28            success = True29        except sqlite3.Error as exception:30            print(exception)31            success = False32        finally:33            self._close_cursor(cursor)34            self._close_connection(connection)35        return success36    def execute_query(self, statement, parameters):37        try:38            connection = self._create_connection()39            cursor = connection.cursor()40            cursor.execute(statement, parameters)41            result = cursor.fetchall()42        except sqlite3.Error as exception:43            print(exception)44            result = None45        finally:46            self._close_cursor(cursor)47            self._close_connection(connection)48        return result49    50    def close_cursor_manually(self, cursor):51        self._close_cursor(cursor)52        53    def close_connection_manually(self, connection):54        self._close_connection(connection)55    56    def fetch_all_data(self, statement, parameters):57        try:58            connection = self._create_connection()59            cursor = connection.cursor()60            cursor.execute(statement, parameters)61            result = cursor.fetchall()62        except sqlite3.Error as exception:63            self._close_cursor(cursor)64            self._close_connection(connection)65            result = None66            cursor = None67            connection = None...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
