Best Python code snippet using green
test_cluster.py
Source:test_cluster.py  
1"""2WSQL3====4An asynchronous DB API v2.0 compatible interface to MySQL5---------------------------------------------------------6This program is free software: you can redistribute it and/or modify7it under the terms of the GNU General Public License as published by8the Free Software Foundation, either version 3 of the License, or9(at your option) any later version.10This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13GNU General Public License for more details.14You should have received a copy of the GNU General Public License15along with this program.  If not, see <http://www.gnu.org/licenses/>.16"""17__author__ = "@bg"18try:19    from _case import DatabaseTestCase20    import _wsql_context21except ImportError:  # pragma: no cover22    from ._case import DatabaseTestCase23    from . import _wsql_context24from unittest import TestCase25from wsql import exceptions26from wsql.cluster import ConnectionPool, Upstream, transaction, retryable, Cluster, connect27from wsql.cluster.upstream import ServerInfo, Connection28from wsql.cluster import _parser29class DummyLogger:30    @staticmethod31    def error(msg, *args, **kwargs):32        pass33class TestServerInfo(TestCase):34    def test_to_str(self):35        srv_info = ServerInfo(host='localhost', port=3306)36        self.assertEqual('localhost:3306', str(srv_info))37        srv_info = ServerInfo(socket_name='/var/tmp/socket.sock')38        self.assertEqual('/var/tmp/socket.sock', str(srv_info))39class TestCluster(DatabaseTestCase):40    def make_upstream(self, servers):41        """abstract method to create a upstream"""42        return Upstream(servers, DummyLogger, loop=self._context.loop, **self._context.connect_kwargs)43    def make_pool(self, upstream, timeout):44        """abstract method to create connection pool"""45        return ConnectionPool(upstream, timeout=timeout, loop=self._context.loop)46    def get_insert_query(self, table, error=None):  # pragma: no cover47        """48        get the query query49        :param table: the temporary table name50        :param error: error, that should be raised if specified51        :return: the query, that run query logic52        """53        raise NotImplementedError54    def wrap_query(self, query):  # pragma: no cover55        """56        make a new query around specified57        :param query: the query to wrap58        :return: wrapped query59        """60        raise NotImplementedError61    def make_exception_query(self, retry_count):  # pragma: no cover62        """create query with retry_count"""63        raise NotImplementedError64    def count_calls(self, func):65        def wrapper(*args, **kwargs):66            wrapper.call_count += 167            return func(*args, **kwargs)68        wrapper.call_count = 069        return wrapper70    def test_next(self):71        """test next method of Upstream"""72        kwargs2 = self._context.connect_kwargs.copy()73        kwargs2['port'] = 174        upstream = self.make_upstream([kwargs2, self._context.connect_kwargs])75        connection = self._context.wait(next(upstream))76        connection2 = self._context.wait(next(upstream))77        self.assertIs(self._context.connect_kwargs['host'], connection.meta.kwargs['host'])78        self.assertIs(self._context.connect_kwargs['host'], connection2.meta.kwargs['host'])79        self.assertTrue((x.penalty > 0 for x in upstream._servers))80    def test_no_connections(self):81        """test case when there is no online servers more"""82        kwargs2 = self._context.connect_kwargs.copy()83        kwargs2['port'] = 184        upstream = self.make_upstream([kwargs2])85        with self.assertRaises(RuntimeError):86            self._context.wait(next(upstream))87    def test_invalidate(self):88        """test invalidate method of Upstream"""89        upstream = self.make_upstream([self._context.connect_kwargs])90        connection = self._context.wait(next(upstream))91        upstream.invalidate(connection)92        self.assertGreater(connection.meta.penalty, 0)93        connection.meta.penalty = 094        upstream.invalidate(connection.meta)95        self.assertGreater(connection.meta.penalty, 0)96        self.assertGreater(upstream._servers[0].penalty, 0)97        self.assertRaises(ValueError, upstream.invalidate, 1)98    def test_acquire(self):99        """test _acquire method of ConnectionPoolAsync"""100        connections = []101        pool_size = 3102        timeout = 0.1103        pool = self.make_pool(self.make_upstream([{'count': pool_size}]), timeout)104        for i in range(pool_size):105            connection = self._context.wait(pool._acquire())106            self.assertIsNotNone(connection)107            connections.append(connection)108            self.assertEqual(pool_size - i - 1, pool._reserve)109        self.assertEqual(pool_size, len(connections))110    def test_release(self):111        """test _release method of ConnectionPoolAsync"""112        pool_size = 3113        timeout = 0.1114        pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)115        connection = self._context.wait(pool._acquire())116        self.assertIsNotNone(connection)117        self.assertEqual(0, pool._queue.qsize())118        pool._release(connection)119        self.assertEqual(1, pool._queue.qsize())120    def test_release_closed_connection(self):121        """test _release method of ConnectionPoolAsync in case if connection closed"""122        pool_size = 3123        timeout = 0.1124        pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)125        connection = self._context.wait(pool._acquire())126        self.assertIsNotNone(connection)127        self.assertEqual(0, pool._queue.qsize())128        connection.connection().close()129        pool._release(connection)130        self.assertEqual(0, pool._queue.qsize())131        self.assertEqual(pool_size, pool._reserve)132    def test_acquire_if_no_free(self):133        """test _acquire method of ConnectionPoolAsync if there is no free connections"""134        connections = []135        pool_size = 3136        timeout = 0.1137        pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)138        for i in range(pool_size):139            connection = self._context.wait(pool._acquire())140            self.assertIsNotNone(connection)141            connections.append(connection)142            self.assertEqual(pool_size - i - 1, pool._reserve)143        self.assertEqual(0, pool._reserve)144        self.assertTrue(pool._queue.empty())145        self.assertRaises(pool.TimeoutError, self._context.call_and_wait, pool._acquire)146    def test_connection_pool_execute(self):147        """test _release method of ConnectionPoolAsync"""148        pool_size = 3149        timeout = 0.1150        pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)151        connection = self._context.wait(pool.execute(self._context.decorator(lambda x: x)))152        self.assertIsNotNone(connection)153    def test_transaction_rollback(self):154        """test transaction rollback"""155        table = self._create_table(('name VARCHAR(10)',), None)156        connection_holder = Connection(self._context.make_connection())157        query = self.get_insert_query(table, Exception('rollback expected!'))158        self.assertRaisesRegex(Exception, 'rollback expected!',159                               self._context.call_and_wait, connection_holder.execute, transaction(query))160        cursor = self._context.cursor()161        self._context.wait(cursor.execute("SELECT * FROM %s" % table))162        self.assertEqual([], cursor.fetchall())163    def test_commit(self):164        """test transaction commit"""165        table = self._create_table(('name VARCHAR(10)',), None)166        connection = Connection(self._context.make_connection())167        query = self.get_insert_query(table)168        self.assertIs(self._context.wait(connection.execute(transaction(query))), True)169        self._context.wait(connection.rollback())170        cursor = self._context.cursor()171        self._context.wait(cursor.execute("SELECT * FROM %s" % table))172        self.assertEqual([('Evelina',)], cursor.fetchall())173    def test_recursive_transaction(self):174        """test recursive transaction"""175        table = self._create_table(('name VARCHAR(10)',), None)176        connection_holder = Connection(self._context.make_connection())177        query = transaction(self.get_insert_query(table))178        connection_holder.commit = self.count_calls(connection_holder.commit)179        connection_holder.commit.call_count = 0180        self._context.wait(connection_holder.execute(transaction(query)))181        self.assertEqual(1, connection_holder.commit.call_count)182    def test_retries(self):183        """test retries"""184        retry_count = 3185        delay = 0.1186        connection = retryable(Connection(self._context.make_connection()), retry_count, delay)187        query = self.make_exception_query(retry_count - 1)188        self._context.wait(connection.execute(query))189        self.assertEqual(retry_count - 1, query.call_count)190        query = self.make_exception_query(retry_count + 1)191        self.assertRaises(Exception, self._context.call_and_wait, connection.execute, query)192    def test_cluster(self):193        """ test commutator """194        read_connection = Connection(self._context.make_connection())195        write_connection = Connection(self._context.make_connection())196        read_connection.readonly = True197        write_connection.readonly = False198        read_query = self.wrap_query(lambda x: self.assertTrue(x.readonly))199        write_query = transaction(self.wrap_query(lambda x: self.assertFalse(x.readonly)))200        cluster = Cluster(master=write_connection, slave=read_connection)201        self._context.wait(cluster.execute(read_query))202        self._context.wait(cluster.execute(write_query))203        cluster = Cluster(master=None, slave=read_connection)204        self.assertRaisesRegex(Exception, "the operation is not permitted on read-only cluster", cluster.execute, write_query)205    def test_connect(self):206        """test connect to the cluster"""207        connection_args = {"master": "localhost:3306#2,localhost#4", "slave": "localhost:3306#2", "database": "test"}208        connection = connect(connection_args, loop=self._context.loop)209        self.assertIsInstance(connection, Cluster)210        self.assertEqual(6, len(connection._cluster[1]._connection._upstream))211        self.assertEqual(2, len(connection._cluster[0]._connection._upstream))212        connection_args = {"slave": "localhost:3306#2", "database": "test"}213        self.assertIsInstance(connect(connection_args, loop=self._context.loop), Cluster)214        connection_args = {"master": "localhost:3306#2,localhost#4", "database": "test"}215        self.assertFalse(isinstance(connect(connection_args, loop=self._context.loop), Cluster))216        connection_args = {}217        self.assertFalse(isinstance(connect(connection_args, loop=self._context.loop), Cluster))218        connection_args = "master=localhost:3306#2,localhost#4;slave=localhost:3306#2;database=test;"219        connection = connect(connection_args, loop=self._context.loop)220        self.assertIsInstance(connection, Cluster)221        self.assertEqual(6, len(connection._cluster[1]._connection._upstream))222        self.assertEqual(2, len(connection._cluster[0]._connection._upstream))223        # test real connect224        connection_args = {"master": "%(host)s" % _wsql_context.Configuration.connect_kwargs}225        connection_args.update(_wsql_context.Configuration.connect_kwargs)226        connection = connect(connection_args, loop=self._context.loop)227        connection.execute(self.wrap_query(lambda x: None))228class TestClusterSync(TestCluster):229    @classmethod230    def get_context(cls):231        return _wsql_context.Context(_wsql_context.Configuration())232    def get_insert_query(self, table, error=None):233        def query(connection):234            cursor = self._context.wrap(connection.cursor())235            try:236                cursor.execute('INSERT INTO `%s` VALUES(\'%s\');' % (table, 'Evelina'))237            finally:238                cursor.close()239            if error:240                raise error241            return True242        return query243    def wrap_query(self, query):244        """245        make a new query around specified246        :param query: the query to wrap247        :return: wrapped query248        """249        return query250    def make_exception_query(self, retry_count):251        def query(_):252            query.call_count += 1253            if query.call_count < retry_count:254                exc = self._context.errors.Error(self._context.constants.CR_SERVER_LOST, "connection lost")255                exc.code = self._context.constants.CR_SERVER_LOST256                raise exc257            return None258        query.call_count = 0259        return query260class TestClusterAsync(TestCluster):261    @classmethod262    def get_context(cls):263        return _wsql_context.Context(_wsql_context.ConfigurationAsync())264    def get_insert_query(self, table, error=None):265        @self._context.decorator266        def query(connection):267            cursor = connection.cursor()268            try:269                yield from cursor.execute('INSERT INTO %s VALUES (\'%s\')' % (table, 'Evelina'))270            finally:271                yield from cursor.close()272            if error:273                raise error274            return True275        return query276    def wrap_query(self, query):277        """278        make a new query around specified279        :param query: the query to wrap280        :return: wrapped query281        """282        @self._context.decorator283        def wrapper(connection):284            if self._context.iscoroutine(query):  # pragma: no cover285                return (yield from query(connection))286            return query(connection)287        return wrapper288    def make_exception_query(self, retry_count):289        @self._context.decorator290        def query(_):291            query.call_count += 1292            if query.call_count < retry_count:293                exc = self._context.errors.Error(self._context.constants.CR_SERVER_LOST, "connection lost")294                exc.code = self._context.constants.CR_SERVER_LOST295                raise exc296            return None297        query.call_count = 0298        return query299del TestCluster300class TestException(TestCase):301    """302    test user exceptions303    """304    class TestError(exceptions.UserError):305        pass306    def test_handle_error(self):307        """test handle_error method"""308        self.assertRaisesRegex(self.TestError, "(1, 'this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "TestError; this is test error"))309        self.assertRaisesRegex(exceptions.UserError, "(1, 'Test2Error; this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "Test2Error; this is test error"))310        self.assertRaisesRegex(311            exceptions.UserError, "(1, 'this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "this is test error"))312        self.assertRaisesRegex(ValueError, "this is test error", exceptions.handle_error, self, ValueError("this is test error"))313class TestParsers(TestCase):314    def test_connection_parser(self):315        """test parser of connection string"""316        args = _parser.parse_connection_string("database=test;master=localhost:3306#2,localhost#4")317        self.assertEqual({"database": "test", "master": "localhost:3306#2,localhost#4"}, args)318        args2 = _parser.parse_connection_string(args)319        self.assertEqual(args, args2)320        self.assertRaises(ValueError, _parser.parse_connection_string, ["database=test;master=localhost:3306#2,localhost#4"])321    def test_uri_parser(self):322        """test parse uri"""323        result = _parser.uri_parser(lambda x: list(x))("mysql://localhost:3306#2,127.0.0.1#4")324        self.assertEqual(325            [326                {"scheme": "mysql", "host": "localhost", "port": "3306", "count": "2"},327                {"scheme": None, "port": None, "host": "127.0.0.1", "count": "4"}328            ],329            result330        )331        result = _parser.uri_parser(lambda x: list(x))("a-b_c123.13.domen.com:3306#1")332        self.assertEqual(333            [{"scheme": None, "host": "a-b_c123.13.domen.com", "port": "3306", "count": "1"}],334            result...summarize.py
Source:summarize.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""Summarize grades.4usage:5    pygrade summarize [--grades <file>] [--test-names <names>] [--student-names <names>]6Options7    -h, --help8    -g, --grades <file>              JSON grades output by the grade command [default: grades.json]9    -t, --test-names <names>         Comma-separated list of test names to summarize.10    -s, --student-names <names>      Comma-separated list of student github ids to summarize.11"""12from collections import Counter, defaultdict13from docopt import docopt14import json15import re16def print_grade_distribution(grades):17    print('\n\n----------------------------\ngrade distribution:\ngrade\tcount\tstudents')18    counts = Counter(float(g['grade']) for g in grades)19    grade2students = defaultdict(lambda: [])20    for g in grades:21        grade2students[g['grade']].append(g['student']['github_id'])22    for grade, count in sorted(counts.items(), reverse=True):23        if count < 10:24            print('%d\t%d\t%s' % (grade, count, ' '.join(grade2students[grade])))25        else:26            print('%d\t%d' % (grade, count))27def clean_summary(s):28    if s and ':' in s:29        return s[:s.index(':')]30    else:31        return s.strip()32def print_test_distribution(grades):33    print('\n\n----------------------------\ntest failures distribution:\n%20s\tcount\tstudents' % 'test')34    counts = Counter()35    test2students = defaultdict(lambda: [])36    for g in grades:37        counts.update(clean_summary(d['summary']) for d in g['deductions'])38        for d in g['deductions']:39            test2students[clean_summary(d['summary'])].append(g['student']['github_id'])40    for test, count in sorted(counts.items(), key=lambda x: x[1], reverse=True):41        if count < 10:42            print('%20s\t%d\t%s' % (test, count, ' '.join(test2students[test])))43        else:44            print('%20s\t%d' % (test, count))45def extract_error(trace):46    re.find('.*([A-Za-z]Error:.+)$', trace)47def summarize_errors(grades, test_names):48    test_names = set([s.strip() for s in test_names.split(',')])49    test2error_counts = defaultdict(lambda: Counter())50    error2students = defaultdict(lambda: [])51    for g in grades:52        for d in g['deductions']:53            test_name = clean_summary(d['summary'])54            if test_name in test_names:55                trace = d['trace']56                text = trace[:trace.index('source:')].strip() if 'source:' in trace else trace.strip()57                test2error_counts[test_name].update([text])58                error2students[test_name + '__' + text].append(g['student']['github_id'])59    for test, error_counts in sorted(test2error_counts.items()):60        print('\n\n----------------------------\nSummary of errors for test %s\n' % test)61        for error, count in sorted(error_counts.items(), key=lambda x: -x[1]):62            print('count=%d' % count)63            print('students=%s' % ' '.join(error2students[test + '__' + error]))64            print(error)65            print('\n')66def summarize_students(grades, student_names):67    student_names = set([s.strip() for s in student_names.split(',')])68    for g in grades:69        if g['student']['github_id'] in student_names:70            print('\n\n----------------------------\nSummary of errors for student %s\n' % g['student']['github_id'])71            for d in g['deductions']:72                print('\n%d points deducted for %s' % (d['points'], clean_summary(d['summary'])))73                print(d['trace'])74def main():75    args = docopt(__doc__)76    grades = [json.loads(s) for s in open(args['--grades'])]77    print_grade_distribution(grades)78    print_test_distribution(grades)79    if args['--test-names']:80        summarize_errors(grades, args['--test-names'])81    if args['--student-names']:82        summarize_students(grades, args['--student-names'])83if __name__ == '__main__':...examples.py
Source:examples.py  
...13        This test will print output to stderr, and then fail an assertion.14        """15        sys.stderr.write("Doom and gloom.\n")16        self.assertTrue(False)17    def test2Error(self):18        """19        An Exception will be raised (and not caught) while running this test.20        """21        raise Exception22    @unittest.skip("This is the 'reason' portion of the skipped test.")23    def test3Skip(self):24        """25        This test will be skipped.26        """27        pass28    @unittest.expectedFailure29    def test4ExpectedFailure(self):30        """31        This test will fail, but we expect it to....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
