Best Python code snippet using avocado_python
test_cext_cursor.py
Source:test_cext_cursor.py  
...47)48LOGGER = logging.getLogger(tests.LOGGER_NAME)49@unittest.skipIf(HAVE_CMYSQL == False, "C Extension not available")50class CExtMySQLCursorTests(tests.CMySQLCursorTests):51    def _get_cursor(self, cnx=None):52        if not cnx:53            cnx = CMySQLConnection(**self.config)54        return CMySQLCursor(connection=cnx)55    def test___init__(self):56        self.assertRaises(errors.InterfaceError, CMySQLCursor, connection='ham')57        cur = self._get_cursor(self.cnx)58        self.assertTrue(hex(id(self.cnx)).upper()[2:-1]59                        in repr(cur._cnx).upper())60    def test_lastrowid(self):61        cur = self._get_cursor(self.cnx)62        tbl = 'test_lastrowid'63        self.setup_table(self.cnx, tbl)64        cur.execute("INSERT INTO {0} (col1) VALUES (1)".format(tbl))65        self.assertEqual(1, cur.lastrowid)66        cur.execute("INSERT INTO {0} () VALUES ()".format(tbl))67        self.assertEqual(2, cur.lastrowid)68        cur.execute("INSERT INTO {0} () VALUES (),()".format(tbl))69        self.assertEqual(3, cur.lastrowid)70        cur.execute("INSERT INTO {0} () VALUES ()".format(tbl))71        self.assertEqual(5, cur.lastrowid)72    def test__fetch_warnings(self):73        self.cnx.get_warnings = True74        cur = self._get_cursor(self.cnx)75        cur._cnx = None76        self.assertRaises(errors.InterfaceError, cur._fetch_warnings)77        cur = self._get_cursor(self.cnx)78        cur.execute("SELECT 'a' + 'b'")79        cur.fetchall()80        exp = [81            ('Warning', 1292, "Truncated incorrect DOUBLE value: 'a'"),82            ('Warning', 1292, "Truncated incorrect DOUBLE value: 'b'")83        ]84        res = cur._fetch_warnings()85        self.assertTrue(tests.cmp_result(exp, res))86        self.assertEqual(len(exp), cur._warning_count)87    def test_execute(self):88        self.cnx.get_warnings = True89        cur = self._get_cursor(self.cnx)90        self.assertEqual(None, cur.execute(None))91        self.assertRaises(errors.ProgrammingError, cur.execute,92                          'SELECT %s,%s,%s', ('foo', 'bar',))93        cur.execute("SELECT 'a' + 'b'")94        cur.fetchall()95        exp = [96            ('Warning', 1292, "Truncated incorrect DOUBLE value: 'a'"),97            ('Warning', 1292, "Truncated incorrect DOUBLE value: 'b'")98        ]99        self.assertTrue(tests.cmp_result(exp, cur._warnings))100        self.cnx.get_warnings = False101        cur.execute("SELECT BINARY 'ham'")102        exp = [('ham',)]103        self.assertEqual(exp, cur.fetchall())104        cur.close()105        tbl = 'myconnpy_cursor'106        self.setup_table(self.cnx, tbl)107        cur = self._get_cursor(self.cnx)108        stmt_insert = "INSERT INTO {0} (col1,col2) VALUES (%s,%s)".format(tbl)109        res = cur.execute(stmt_insert, (1, 100))110        self.assertEqual(None, res, "Return value of execute() is wrong.")111        stmt_select = "SELECT col1,col2 FROM {0} ORDER BY col1".format(tbl)112        cur.execute(stmt_select)113        self.assertEqual([(1, '100')],114                         cur.fetchall(), "Insert test failed")115        data = {'id': 2}116        stmt = "SELECT col1,col2 FROM {0} WHERE col1 <= %(id)s".format(tbl)117        cur.execute(stmt, data)118        self.assertEqual([(1, '100')], cur.fetchall())119        cur.close()120    def test_executemany__errors(self):121        self.cnx.get_warnings = True122        cur = self._get_cursor(self.cnx)123        self.assertEqual(None, cur.executemany(None, []))124        cur = self._get_cursor(self.cnx)125        self.assertRaises(errors.ProgrammingError, cur.executemany,126                          'programming error with string', 'foo')127        self.assertRaises(errors.ProgrammingError, cur.executemany,128                          'programming error with 1 element list', ['foo'])129        self.assertEqual(None, cur.executemany('empty params', []))130        self.assertEqual(None, cur.executemany('params is None', None))131        self.assertRaises(errors.ProgrammingError, cur.executemany,132                          'foo', ['foo'])133        self.assertRaises(errors.ProgrammingError, cur.executemany,134                          'SELECT %s', [('foo',), 'foo'])135        self.assertRaises(errors.ProgrammingError,136                          cur.executemany,137                          "INSERT INTO t1 1 %s", [(1,), (2,)])138        cur.executemany("SELECT SHA1(%s)", [('foo',), ('bar',)])139        self.assertEqual(None, cur.fetchone())140    def test_executemany(self):141        tbl = 'myconnpy_cursor'142        self.setup_table(self.cnx, tbl)143        144        stmt_insert = "INSERT INTO {0} (col1,col2) VALUES (%s,%s)".format(tbl)145        stmt_select = "SELECT col1,col2 FROM {0} ORDER BY col1".format(tbl)146        cur = self._get_cursor(self.cnx)147        res = cur.executemany(stmt_insert, [(1, 100), (2, 200), (3, 300)])148        self.assertEqual(3, cur.rowcount)149        res = cur.executemany("SELECT %s", [('f',), ('o',), ('o',)])150        self.assertEqual(3, cur.rowcount)151        data = [{'id': 2}, {'id': 3}]152        stmt = "SELECT * FROM {0} WHERE col1 <= %(id)s".format(tbl)153        cur.executemany(stmt, data)154        self.assertEqual(5, cur.rowcount)155        cur.execute(stmt_select)156        self.assertEqual([(1, '100'), (2, '200'), (3, '300')],157                         cur.fetchall(), "Multi insert test failed")158        data = [{'id': 2}, {'id': 3}]159        stmt = "DELETE FROM {0} WHERE col1 = %(id)s".format(tbl)160        cur.executemany(stmt, data)161        self.assertEqual(2, cur.rowcount)162        stmt = "TRUNCATE TABLE {0}".format(tbl)163        cur.execute(stmt)164        stmt = (165            "/*comment*/INSERT/*comment*/INTO/*comment*/{0}(col1,col2)VALUES"166            "/*comment*/(%s,%s/*comment*/)/*comment()*/ON DUPLICATE KEY UPDATE"167            " col1 = VALUES(col1)"168        ).format(tbl)169        cur.executemany(stmt, [(4, 100), (5, 200), (6, 300)])170        self.assertEqual(3, cur.rowcount)171        cur.execute(stmt_select)172        self.assertEqual([(4, '100'), (5, '200'), (6, '300')],173                         cur.fetchall(), "Multi insert test failed")174        stmt = "TRUNCATE TABLE {0}".format(tbl)175        cur.execute(stmt)176        stmt = (177            "INSERT INTO/*comment*/{0}(col1,col2)VALUES"178            "/*comment*/(%s,'/*100*/')/*comment()*/ON DUPLICATE KEY UPDATE "179            "col1 = VALUES(col1)"180        ).format(tbl)181        cur.executemany(stmt, [(4,), (5,)])182        self.assertEqual(2, cur.rowcount)183        cur.execute(stmt_select)184        self.assertEqual([(4, '/*100*/'), (5, '/*100*/')],185                         cur.fetchall(), "Multi insert test failed")186        cur.close()187    def _test_callproc_setup(self, cnx):188        self._test_callproc_cleanup(cnx)189        stmt_create1 = (190            "CREATE PROCEDURE myconnpy_sp_1 "191            "(IN pFac1 INT, IN pFac2 INT, OUT pProd INT) "192            "BEGIN SET pProd := pFac1 * pFac2; END;")193        stmt_create2 = (194            "CREATE PROCEDURE myconnpy_sp_2 "195            "(IN pFac1 INT, IN pFac2 INT, OUT pProd INT) "196            "BEGIN SELECT 'abc'; SELECT 'def'; SET pProd := pFac1 * pFac2; "197            "END;")198        stmt_create3 = (199            "CREATE PROCEDURE myconnpy_sp_3"200            "(IN pStr1 VARCHAR(20), IN pStr2 VARCHAR(20), "201            "OUT pConCat VARCHAR(100)) "202            "BEGIN SET pConCat := CONCAT(pStr1, pStr2); END;")203        stmt_create4 = (204            "CREATE PROCEDURE myconnpy_sp_4"205            "(IN pStr1 VARCHAR(20), INOUT pStr2 VARCHAR(20), "206            "OUT pConCat VARCHAR(100)) "207            "BEGIN SET pConCat := CONCAT(pStr1, pStr2); END;")208        try:209            cur = cnx.cursor()210            cur.execute(stmt_create1)211            cur.execute(stmt_create2)212            cur.execute(stmt_create3)213            cur.execute(stmt_create4)214        except errors.Error as err:215            self.fail("Failed setting up test stored routine; {0}".format(err))216        cur.close()217    def _test_callproc_cleanup(self, cnx):218        sp_names = ('myconnpy_sp_1', 'myconnpy_sp_2', 'myconnpy_sp_3',219                    'myconnpy_sp_4')220        stmt_drop = "DROP PROCEDURE IF EXISTS {procname}"221        try:222            cur = cnx.cursor()223            for sp_name in sp_names:224                cur.execute(stmt_drop.format(procname=sp_name))225        except errors.Error as err:226            self.fail(227                "Failed cleaning up test stored routine; {0}".format(err))228        cur.close()229    def test_callproc(self):230        cur = self._get_cursor(self.cnx)231        self.check_method(cur, 'callproc')232        self.assertRaises(ValueError, cur.callproc, None)233        self.assertRaises(ValueError, cur.callproc, 'sp1', None)234        config = tests.get_mysql_config()235        self.cnx.get_warnings = True236        self._test_callproc_setup(self.cnx)237        cur = self.cnx.cursor()238        if tests.MYSQL_VERSION < (5, 1):239            exp = ('5', '4', b'20')240        else:241            exp = (5, 4, 20)242        result = cur.callproc('myconnpy_sp_1', (exp[0], exp[1], 0))243        self.assertEqual(exp, result)244        if tests.MYSQL_VERSION < (5, 1):245            exp = ('6', '5', b'30')246        else:247            exp = (6, 5, 30)248        result = cur.callproc('myconnpy_sp_2', (exp[0], exp[1], 0))249        self.assertTrue(isinstance(cur._stored_results, list))250        self.assertEqual(exp, result)251        exp_results = [252            ('abc',),253            ('def',)254        ]255        for i, result in enumerate(cur.stored_results()):256            self.assertEqual(exp_results[i], result.fetchone())257        exp = ('ham', 'spam', 'hamspam')258        result = cur.callproc('myconnpy_sp_3', (exp[0], exp[1], 0))259        self.assertTrue(isinstance(cur._stored_results, list))260        self.assertEqual(exp, result)261        exp = ('ham', 'spam', 'hamspam')262        result = cur.callproc('myconnpy_sp_4',263                              (exp[0], (exp[1], 'CHAR'), (0, 'CHAR')))264        self.assertTrue(isinstance(cur._stored_results, list))265        self.assertEqual(exp, result)266        cur.close()267        self._test_callproc_cleanup(self.cnx)268    def test_fetchone(self):269        cur = self._get_cursor(self.cnx)270        self.assertEqual(None, cur.fetchone())271        cur = self.cnx.cursor()272        cur.execute("SELECT BINARY 'ham'")273        exp = ('ham',)274        self.assertEqual(exp, cur.fetchone())275        self.assertEqual(None, cur.fetchone())276        cur.close()277    def test_fetchmany(self):278        """MySQLCursor object fetchmany()-method"""279        cur = self._get_cursor(self.cnx)280        self.assertEqual([], cur.fetchmany())281        tbl = 'myconnpy_fetch'282        self.setup_table(self.cnx, tbl)283        stmt_insert = (284            "INSERT INTO {table} (col1,col2) "285            "VALUES (%s,%s)".format(table=tbl))286        stmt_select = (287            "SELECT col1,col2 FROM {table} "288            "ORDER BY col1 DESC".format(table=tbl))289        cur = self.cnx.cursor()290        nrrows = 10291        data = [(i, str(i * 100)) for i in range(1, nrrows+1)]292        cur.executemany(stmt_insert, data)293        cur.execute(stmt_select)294        exp = [(10, '1000'), (9, '900'), (8, '800'), (7, '700')]295        rows = cur.fetchmany(4)296        self.assertTrue(tests.cmp_result(exp, rows),297                        "Fetching first 4 rows test failed.")298        exp = [(6, '600'), (5, '500'), (4, '400')]299        rows = cur.fetchmany(3)300        self.assertTrue(tests.cmp_result(exp, rows),301                        "Fetching next 3 rows test failed.")302        exp = [(3, '300'), (2, '200'), (1, '100')]303        rows = cur.fetchmany(3)304        self.assertTrue(tests.cmp_result(exp, rows),305                        "Fetching next 3 rows test failed.")306        self.assertEqual([], cur.fetchmany())307        # Fetch more than we have.308        cur.execute(stmt_select)309        rows = cur.fetchmany(100)310        exp = [(10, '1000'), (9, '900'), (8, '800'), (7, '700'),311               (6, '600'), (5, '500'), (4, '400'),312               (3, '300'), (2, '200'), (1, '100')]313        self.assertTrue(tests.cmp_result(exp, rows),314                        "Fetching next 3 rows test failed.")315        cur.close()316    def test_fetchall(self):317        cur = self._get_cursor(self.cnx)318        self.assertRaises(errors.InterfaceError, cur.fetchall)319        tbl = 'myconnpy_fetch'320        self.setup_table(self.cnx, tbl)321        stmt_insert = (322            "INSERT INTO {table} (col1,col2) "323            "VALUES (%s,%s)".format(table=tbl))324        stmt_select = (325            "SELECT col1,col2 FROM {table} "326            "ORDER BY col1 ASC".format(table=tbl))327        cur = self.cnx.cursor()328        cur.execute("SELECT * FROM {table}".format(table=tbl))329        self.assertEqual([], cur.fetchall(),330                         "fetchall() with empty result should return []")331        nrrows = 10332        data = [(i, str(i * 100)) for i in range(1, nrrows+1)]333        cur.executemany(stmt_insert, data)334        cur.execute(stmt_select)335        self.assertTrue(tests.cmp_result(data, cur.fetchall()),336                        "Fetching all rows failed.")337        self.assertEqual(None, cur.fetchone())338        cur.close()339    def test_raise_on_warning(self):340        self.cnx.raise_on_warnings = True341        cur = self._get_cursor(self.cnx)342        cur.execute("SELECT 'a' + 'b'")343        try:344            cur.execute("SELECT 'a' + 'b'")345            cur.fetchall()346        except errors.DatabaseError:347            pass348        else:349            self.fail("Did not get exception while raising warnings.")350    def test__str__(self):351        cur = self._get_cursor(self.cnx)352        self.assertEqual("CMySQLCursor: (Nothing executed yet)",353                         cur.__str__())354        355        cur.execute("SELECT VERSION()")356        cur.fetchone()357        self.assertEqual("CMySQLCursor: SELECT VERSION()",358                         cur.__str__())359        stmt = "SELECT VERSION(),USER(),CURRENT_TIME(),NOW(),SHA1('myconnpy')"360        cur.execute(stmt)361        cur.fetchone()362        self.assertEqual("CMySQLCursor: {0}..".format(stmt[:40]),363                         cur.__str__())364        cur.close()365    def test_column_names(self):366        cur = self._get_cursor(self.cnx)367        stmt = "SELECT NOW() as now, 'The time' as label, 123 FROM dual"368        exp = ('now', 'label', '123')369        cur.execute(stmt)370        cur.fetchone()371        self.assertEqual(exp, cur.column_names)372        cur.close()373    def test_statement(self):374        cur = CMySQLCursor(self.cnx)375        exp = 'SELECT * FROM ham'376        cur._executed = exp377        self.assertEqual(exp, cur.statement)378        cur._executed = '  ' + exp + '    '379        self.assertEqual(exp, cur.statement)380        cur._executed = b'SELECT * FROM ham'381        self.assertEqual(exp, cur.statement)382    def test_with_rows(self):383        cur = CMySQLCursor(self.cnx)384        self.assertFalse(cur.with_rows)385        cur._description = ('ham', 'spam')386        self.assertTrue(cur.with_rows)387    def tests_nextset(self):388        cur = CMySQLCursor(self.cnx)389        stmt = "SELECT 'result', 1; SELECT 'result', 2; SELECT 'result', 3"390        cur.execute(stmt)391        self.assertEqual([('result', 1)], cur.fetchall())392        self.assertTrue(cur.nextset())393        self.assertEqual([('result', 2)], cur.fetchall())394        self.assertTrue(cur.nextset())395        self.assertEqual([('result', 3)], cur.fetchall())396        self.assertEqual(None, cur.nextset())397        tbl = 'myconnpy_nextset'398        stmt = "SELECT 'result', 1; INSERT INTO {0} () VALUES (); " \399               "SELECT * FROM {0}".format(tbl)400        self.setup_table(self.cnx, tbl)401        cur.execute(stmt)402        self.assertEqual([('result', 1)], cur.fetchall())403        try:404            cur.nextset()405        except errors.Error as exc:406            self.assertEqual(errorcode.CR_NO_RESULT_SET, exc.errno)407            self.assertEqual(1, cur._affected_rows)408        self.assertTrue(cur.nextset())409        self.assertEqual([(1, None, 0)], cur.fetchall())410        self.assertEqual(None, cur.nextset())411        cur.close()412        self.cnx.rollback()413    def tests_execute_multi(self):414        tbl = 'myconnpy_execute_multi'415        stmt = "SELECT 'result', 1; INSERT INTO {0} () VALUES (); " \416               "SELECT * FROM {0}".format(tbl)417        self.setup_table(self.cnx, tbl)418        multi_cur = CMySQLCursor(self.cnx)419        results = []420        exp = [421            (u"SELECT 'result', 1", [(u'result', 1)]),422            (u"INSERT INTO {0} () VALUES ()".format(tbl), 1, 1),423            (u"SELECT * FROM {0}".format(tbl), [(1, None, 0)]),424        ]425        for cur in multi_cur.execute(stmt, multi=True):426            if cur.with_rows:427                results.append((cur.statement, cur.fetchall()))428            else:429                results.append(430                    (cur.statement, cur._affected_rows, cur.lastrowid)431                )432        self.assertEqual(exp, results)433        cur.close()434        self.cnx.rollback()435        cur = self._get_cursor(self.cnx)436        cur.execute("DROP PROCEDURE IF EXISTS multi_results")437        procedure = (438            "CREATE PROCEDURE multi_results () "439            "BEGIN SELECT 1; SELECT 'ham'; END"440        )441        cur.execute(procedure)442        stmt = "CALL multi_results()"443        if not PY2:444            stmt = b"CALL multi_results()"445        exp_result = [[(1,)], [(u'ham',)]]446        results = []447        for result in cur.execute(stmt, multi=True):448            if result.with_rows:449                self.assertEqual(stmt, result._executed)450                results.append(result.fetchall())451        self.assertEqual(exp_result, results)452        cur.execute("DROP PROCEDURE multi_results")453        cur.close()454class CExtMySQLCursorBufferedTests(tests.CMySQLCursorTests):455    def _get_cursor(self, cnx=None):456        if not cnx:457            cnx = CMySQLConnection(**self.config)458        self.cnx.buffered = True459        return CMySQLCursorBuffered(connection=cnx)460    def test___init__(self):461        self.assertRaises(errors.InterfaceError, CMySQLCursorBuffered,462                          connection='ham')463        cur = self._get_cursor(self.cnx)464        self.assertTrue(hex(id(self.cnx)).upper()[2:-1]465                        in repr(cur._cnx).upper())466    def test_execute(self):467        self.cnx.get_warnings = True468        cur = self._get_cursor(self.cnx)469        self.assertEqual(None, cur.execute(None, None))470        self.assertEqual(True,471                         isinstance(cur, CMySQLCursorBuffered))472        cur.execute("SELECT 1")473        self.assertEqual((1,), cur.fetchone())474    def test_raise_on_warning(self):475        self.cnx.raise_on_warnings = True476        cur = self._get_cursor(self.cnx)477        self.assertRaises(errors.DatabaseError,478                          cur.execute, "SELECT 'a' + 'b'")479    def test_with_rows(self):480        cur = self._get_cursor(self.cnx)481        cur.execute("SELECT 1")482        self.assertTrue(cur.with_rows)483class CMySQLCursorRawTests(tests.CMySQLCursorTests):484    def _get_cursor(self, cnx=None):485        if not cnx:486            cnx = CMySQLConnection(**self.config)487        return CMySQLCursorRaw(connection=cnx)488    def test_fetchone(self):489        cur = self._get_cursor(self.cnx)490        self.assertEqual(None, cur.fetchone())491        cur.execute("SELECT 1, 'string', MAKEDATE(2010,365), 2.5")492        exp = (b'1', b'string', b'2010-12-31', b'2.5')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
