How to use test2Error method in green

Best Python code snippet using green

test_cluster.py

Source:test_cluster.py Github

copy

Full Screen

1"""2WSQL3====4An asynchronous DB API v2.0 compatible interface to MySQL5---------------------------------------------------------6This program is free software: you can redistribute it and/or modify7it under the terms of the GNU General Public License as published by8the Free Software Foundation, either version 3 of the License, or9(at your option) any later version.10This program is distributed in the hope that it will be useful,11but WITHOUT ANY WARRANTY; without even the implied warranty of12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13GNU General Public License for more details.14You should have received a copy of the GNU General Public License15along with this program. If not, see <http://www.gnu.org/licenses/>.16"""17__author__ = "@bg"18try:19 from _case import DatabaseTestCase20 import _wsql_context21except ImportError: # pragma: no cover22 from ._case import DatabaseTestCase23 from . import _wsql_context24from unittest import TestCase25from wsql import exceptions26from wsql.cluster import ConnectionPool, Upstream, transaction, retryable, Cluster, connect27from wsql.cluster.upstream import ServerInfo, Connection28from wsql.cluster import _parser29class DummyLogger:30 @staticmethod31 def error(msg, *args, **kwargs):32 pass33class TestServerInfo(TestCase):34 def test_to_str(self):35 srv_info = ServerInfo(host='localhost', port=3306)36 self.assertEqual('localhost:3306', str(srv_info))37 srv_info = ServerInfo(socket_name='/var/tmp/socket.sock')38 self.assertEqual('/var/tmp/socket.sock', str(srv_info))39class TestCluster(DatabaseTestCase):40 def make_upstream(self, servers):41 """abstract method to create a upstream"""42 return Upstream(servers, DummyLogger, loop=self._context.loop, **self._context.connect_kwargs)43 def make_pool(self, upstream, timeout):44 """abstract method to create connection pool"""45 return ConnectionPool(upstream, timeout=timeout, loop=self._context.loop)46 def get_insert_query(self, table, error=None): # pragma: no cover47 """48 get the query query49 :param table: the temporary table name50 :param error: error, that should be raised if specified51 :return: the query, that run query logic52 """53 raise NotImplementedError54 def wrap_query(self, query): # pragma: no cover55 """56 make a new query around specified57 :param query: the query to wrap58 :return: wrapped query59 """60 raise NotImplementedError61 def make_exception_query(self, retry_count): # pragma: no cover62 """create query with retry_count"""63 raise NotImplementedError64 def count_calls(self, func):65 def wrapper(*args, **kwargs):66 wrapper.call_count += 167 return func(*args, **kwargs)68 wrapper.call_count = 069 return wrapper70 def test_next(self):71 """test next method of Upstream"""72 kwargs2 = self._context.connect_kwargs.copy()73 kwargs2['port'] = 174 upstream = self.make_upstream([kwargs2, self._context.connect_kwargs])75 connection = self._context.wait(next(upstream))76 connection2 = self._context.wait(next(upstream))77 self.assertIs(self._context.connect_kwargs['host'], connection.meta.kwargs['host'])78 self.assertIs(self._context.connect_kwargs['host'], connection2.meta.kwargs['host'])79 self.assertTrue((x.penalty > 0 for x in upstream._servers))80 def test_no_connections(self):81 """test case when there is no online servers more"""82 kwargs2 = self._context.connect_kwargs.copy()83 kwargs2['port'] = 184 upstream = self.make_upstream([kwargs2])85 with self.assertRaises(RuntimeError):86 self._context.wait(next(upstream))87 def test_invalidate(self):88 """test invalidate method of Upstream"""89 upstream = self.make_upstream([self._context.connect_kwargs])90 connection = self._context.wait(next(upstream))91 upstream.invalidate(connection)92 self.assertGreater(connection.meta.penalty, 0)93 connection.meta.penalty = 094 upstream.invalidate(connection.meta)95 self.assertGreater(connection.meta.penalty, 0)96 self.assertGreater(upstream._servers[0].penalty, 0)97 self.assertRaises(ValueError, upstream.invalidate, 1)98 def test_acquire(self):99 """test _acquire method of ConnectionPoolAsync"""100 connections = []101 pool_size = 3102 timeout = 0.1103 pool = self.make_pool(self.make_upstream([{'count': pool_size}]), timeout)104 for i in range(pool_size):105 connection = self._context.wait(pool._acquire())106 self.assertIsNotNone(connection)107 connections.append(connection)108 self.assertEqual(pool_size - i - 1, pool._reserve)109 self.assertEqual(pool_size, len(connections))110 def test_release(self):111 """test _release method of ConnectionPoolAsync"""112 pool_size = 3113 timeout = 0.1114 pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)115 connection = self._context.wait(pool._acquire())116 self.assertIsNotNone(connection)117 self.assertEqual(0, pool._queue.qsize())118 pool._release(connection)119 self.assertEqual(1, pool._queue.qsize())120 def test_release_closed_connection(self):121 """test _release method of ConnectionPoolAsync in case if connection closed"""122 pool_size = 3123 timeout = 0.1124 pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)125 connection = self._context.wait(pool._acquire())126 self.assertIsNotNone(connection)127 self.assertEqual(0, pool._queue.qsize())128 connection.connection().close()129 pool._release(connection)130 self.assertEqual(0, pool._queue.qsize())131 self.assertEqual(pool_size, pool._reserve)132 def test_acquire_if_no_free(self):133 """test _acquire method of ConnectionPoolAsync if there is no free connections"""134 connections = []135 pool_size = 3136 timeout = 0.1137 pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)138 for i in range(pool_size):139 connection = self._context.wait(pool._acquire())140 self.assertIsNotNone(connection)141 connections.append(connection)142 self.assertEqual(pool_size - i - 1, pool._reserve)143 self.assertEqual(0, pool._reserve)144 self.assertTrue(pool._queue.empty())145 self.assertRaises(pool.TimeoutError, self._context.call_and_wait, pool._acquire)146 def test_connection_pool_execute(self):147 """test _release method of ConnectionPoolAsync"""148 pool_size = 3149 timeout = 0.1150 pool = self.make_pool(self.make_upstream([{"count": pool_size}]), timeout)151 connection = self._context.wait(pool.execute(self._context.decorator(lambda x: x)))152 self.assertIsNotNone(connection)153 def test_transaction_rollback(self):154 """test transaction rollback"""155 table = self._create_table(('name VARCHAR(10)',), None)156 connection_holder = Connection(self._context.make_connection())157 query = self.get_insert_query(table, Exception('rollback expected!'))158 self.assertRaisesRegex(Exception, 'rollback expected!',159 self._context.call_and_wait, connection_holder.execute, transaction(query))160 cursor = self._context.cursor()161 self._context.wait(cursor.execute("SELECT * FROM %s" % table))162 self.assertEqual([], cursor.fetchall())163 def test_commit(self):164 """test transaction commit"""165 table = self._create_table(('name VARCHAR(10)',), None)166 connection = Connection(self._context.make_connection())167 query = self.get_insert_query(table)168 self.assertIs(self._context.wait(connection.execute(transaction(query))), True)169 self._context.wait(connection.rollback())170 cursor = self._context.cursor()171 self._context.wait(cursor.execute("SELECT * FROM %s" % table))172 self.assertEqual([('Evelina',)], cursor.fetchall())173 def test_recursive_transaction(self):174 """test recursive transaction"""175 table = self._create_table(('name VARCHAR(10)',), None)176 connection_holder = Connection(self._context.make_connection())177 query = transaction(self.get_insert_query(table))178 connection_holder.commit = self.count_calls(connection_holder.commit)179 connection_holder.commit.call_count = 0180 self._context.wait(connection_holder.execute(transaction(query)))181 self.assertEqual(1, connection_holder.commit.call_count)182 def test_retries(self):183 """test retries"""184 retry_count = 3185 delay = 0.1186 connection = retryable(Connection(self._context.make_connection()), retry_count, delay)187 query = self.make_exception_query(retry_count - 1)188 self._context.wait(connection.execute(query))189 self.assertEqual(retry_count - 1, query.call_count)190 query = self.make_exception_query(retry_count + 1)191 self.assertRaises(Exception, self._context.call_and_wait, connection.execute, query)192 def test_cluster(self):193 """ test commutator """194 read_connection = Connection(self._context.make_connection())195 write_connection = Connection(self._context.make_connection())196 read_connection.readonly = True197 write_connection.readonly = False198 read_query = self.wrap_query(lambda x: self.assertTrue(x.readonly))199 write_query = transaction(self.wrap_query(lambda x: self.assertFalse(x.readonly)))200 cluster = Cluster(master=write_connection, slave=read_connection)201 self._context.wait(cluster.execute(read_query))202 self._context.wait(cluster.execute(write_query))203 cluster = Cluster(master=None, slave=read_connection)204 self.assertRaisesRegex(Exception, "the operation is not permitted on read-only cluster", cluster.execute, write_query)205 def test_connect(self):206 """test connect to the cluster"""207 connection_args = {"master": "localhost:3306#2,localhost#4", "slave": "localhost:3306#2", "database": "test"}208 connection = connect(connection_args, loop=self._context.loop)209 self.assertIsInstance(connection, Cluster)210 self.assertEqual(6, len(connection._cluster[1]._connection._upstream))211 self.assertEqual(2, len(connection._cluster[0]._connection._upstream))212 connection_args = {"slave": "localhost:3306#2", "database": "test"}213 self.assertIsInstance(connect(connection_args, loop=self._context.loop), Cluster)214 connection_args = {"master": "localhost:3306#2,localhost#4", "database": "test"}215 self.assertFalse(isinstance(connect(connection_args, loop=self._context.loop), Cluster))216 connection_args = {}217 self.assertFalse(isinstance(connect(connection_args, loop=self._context.loop), Cluster))218 connection_args = "master=localhost:3306#2,localhost#4;slave=localhost:3306#2;database=test;"219 connection = connect(connection_args, loop=self._context.loop)220 self.assertIsInstance(connection, Cluster)221 self.assertEqual(6, len(connection._cluster[1]._connection._upstream))222 self.assertEqual(2, len(connection._cluster[0]._connection._upstream))223 # test real connect224 connection_args = {"master": "%(host)s" % _wsql_context.Configuration.connect_kwargs}225 connection_args.update(_wsql_context.Configuration.connect_kwargs)226 connection = connect(connection_args, loop=self._context.loop)227 connection.execute(self.wrap_query(lambda x: None))228class TestClusterSync(TestCluster):229 @classmethod230 def get_context(cls):231 return _wsql_context.Context(_wsql_context.Configuration())232 def get_insert_query(self, table, error=None):233 def query(connection):234 cursor = self._context.wrap(connection.cursor())235 try:236 cursor.execute('INSERT INTO `%s` VALUES(\'%s\');' % (table, 'Evelina'))237 finally:238 cursor.close()239 if error:240 raise error241 return True242 return query243 def wrap_query(self, query):244 """245 make a new query around specified246 :param query: the query to wrap247 :return: wrapped query248 """249 return query250 def make_exception_query(self, retry_count):251 def query(_):252 query.call_count += 1253 if query.call_count < retry_count:254 exc = self._context.errors.Error(self._context.constants.CR_SERVER_LOST, "connection lost")255 exc.code = self._context.constants.CR_SERVER_LOST256 raise exc257 return None258 query.call_count = 0259 return query260class TestClusterAsync(TestCluster):261 @classmethod262 def get_context(cls):263 return _wsql_context.Context(_wsql_context.ConfigurationAsync())264 def get_insert_query(self, table, error=None):265 @self._context.decorator266 def query(connection):267 cursor = connection.cursor()268 try:269 yield from cursor.execute('INSERT INTO %s VALUES (\'%s\')' % (table, 'Evelina'))270 finally:271 yield from cursor.close()272 if error:273 raise error274 return True275 return query276 def wrap_query(self, query):277 """278 make a new query around specified279 :param query: the query to wrap280 :return: wrapped query281 """282 @self._context.decorator283 def wrapper(connection):284 if self._context.iscoroutine(query): # pragma: no cover285 return (yield from query(connection))286 return query(connection)287 return wrapper288 def make_exception_query(self, retry_count):289 @self._context.decorator290 def query(_):291 query.call_count += 1292 if query.call_count < retry_count:293 exc = self._context.errors.Error(self._context.constants.CR_SERVER_LOST, "connection lost")294 exc.code = self._context.constants.CR_SERVER_LOST295 raise exc296 return None297 query.call_count = 0298 return query299del TestCluster300class TestException(TestCase):301 """302 test user exceptions303 """304 class TestError(exceptions.UserError):305 pass306 def test_handle_error(self):307 """test handle_error method"""308 self.assertRaisesRegex(self.TestError, "(1, 'this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "TestError; this is test error"))309 self.assertRaisesRegex(exceptions.UserError, "(1, 'Test2Error; this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "Test2Error; this is test error"))310 self.assertRaisesRegex(311 exceptions.UserError, "(1, 'this is test error')", exceptions.handle_error, self, exceptions.UserError(1, "this is test error"))312 self.assertRaisesRegex(ValueError, "this is test error", exceptions.handle_error, self, ValueError("this is test error"))313class TestParsers(TestCase):314 def test_connection_parser(self):315 """test parser of connection string"""316 args = _parser.parse_connection_string("database=test;master=localhost:3306#2,localhost#4")317 self.assertEqual({"database": "test", "master": "localhost:3306#2,localhost#4"}, args)318 args2 = _parser.parse_connection_string(args)319 self.assertEqual(args, args2)320 self.assertRaises(ValueError, _parser.parse_connection_string, ["database=test;master=localhost:3306#2,localhost#4"])321 def test_uri_parser(self):322 """test parse uri"""323 result = _parser.uri_parser(lambda x: list(x))("mysql://localhost:3306#2,127.0.0.1#4")324 self.assertEqual(325 [326 {"scheme": "mysql", "host": "localhost", "port": "3306", "count": "2"},327 {"scheme": None, "port": None, "host": "127.0.0.1", "count": "4"}328 ],329 result330 )331 result = _parser.uri_parser(lambda x: list(x))("a-b_c123.13.domen.com:3306#1")332 self.assertEqual(333 [{"scheme": None, "host": "a-b_c123.13.domen.com", "port": "3306", "count": "1"}],334 result...

Full Screen

Full Screen

summarize.py

Source:summarize.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""Summarize grades.4usage:5 pygrade summarize [--grades <file>] [--test-names <names>] [--student-names <names>]6Options7 -h, --help8 -g, --grades <file> JSON grades output by the grade command [default: grades.json]9 -t, --test-names <names> Comma-separated list of test names to summarize.10 -s, --student-names <names> Comma-separated list of student github ids to summarize.11"""12from collections import Counter, defaultdict13from docopt import docopt14import json15import re16def print_grade_distribution(grades):17 print('\n\n----------------------------\ngrade distribution:\ngrade\tcount\tstudents')18 counts = Counter(float(g['grade']) for g in grades)19 grade2students = defaultdict(lambda: [])20 for g in grades:21 grade2students[g['grade']].append(g['student']['github_id'])22 for grade, count in sorted(counts.items(), reverse=True):23 if count < 10:24 print('%d\t%d\t%s' % (grade, count, ' '.join(grade2students[grade])))25 else:26 print('%d\t%d' % (grade, count))27def clean_summary(s):28 if s and ':' in s:29 return s[:s.index(':')]30 else:31 return s.strip()32def print_test_distribution(grades):33 print('\n\n----------------------------\ntest failures distribution:\n%20s\tcount\tstudents' % 'test')34 counts = Counter()35 test2students = defaultdict(lambda: [])36 for g in grades:37 counts.update(clean_summary(d['summary']) for d in g['deductions'])38 for d in g['deductions']:39 test2students[clean_summary(d['summary'])].append(g['student']['github_id'])40 for test, count in sorted(counts.items(), key=lambda x: x[1], reverse=True):41 if count < 10:42 print('%20s\t%d\t%s' % (test, count, ' '.join(test2students[test])))43 else:44 print('%20s\t%d' % (test, count))45def extract_error(trace):46 re.find('.*([A-Za-z]Error:.+)$', trace)47def summarize_errors(grades, test_names):48 test_names = set([s.strip() for s in test_names.split(',')])49 test2error_counts = defaultdict(lambda: Counter())50 error2students = defaultdict(lambda: [])51 for g in grades:52 for d in g['deductions']:53 test_name = clean_summary(d['summary'])54 if test_name in test_names:55 trace = d['trace']56 text = trace[:trace.index('source:')].strip() if 'source:' in trace else trace.strip()57 test2error_counts[test_name].update([text])58 error2students[test_name + '__' + text].append(g['student']['github_id'])59 for test, error_counts in sorted(test2error_counts.items()):60 print('\n\n----------------------------\nSummary of errors for test %s\n' % test)61 for error, count in sorted(error_counts.items(), key=lambda x: -x[1]):62 print('count=%d' % count)63 print('students=%s' % ' '.join(error2students[test + '__' + error]))64 print(error)65 print('\n')66def summarize_students(grades, student_names):67 student_names = set([s.strip() for s in student_names.split(',')])68 for g in grades:69 if g['student']['github_id'] in student_names:70 print('\n\n----------------------------\nSummary of errors for student %s\n' % g['student']['github_id'])71 for d in g['deductions']:72 print('\n%d points deducted for %s' % (d['points'], clean_summary(d['summary'])))73 print(d['trace'])74def main():75 args = docopt(__doc__)76 grades = [json.loads(s) for s in open(args['--grades'])]77 print_grade_distribution(grades)78 print_test_distribution(grades)79 if args['--test-names']:80 summarize_errors(grades, args['--test-names'])81 if args['--student-names']:82 summarize_students(grades, args['--student-names'])83if __name__ == '__main__':...

Full Screen

Full Screen

examples.py

Source:examples.py Github

copy

Full Screen

...13 This test will print output to stderr, and then fail an assertion.14 """15 sys.stderr.write("Doom and gloom.\n")16 self.assertTrue(False)17 def test2Error(self):18 """19 An Exception will be raised (and not caught) while running this test.20 """21 raise Exception22 @unittest.skip("This is the 'reason' portion of the skipped test.")23 def test3Skip(self):24 """25 This test will be skipped.26 """27 pass28 @unittest.expectedFailure29 def test4ExpectedFailure(self):30 """31 This test will fail, but we expect it to....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run green automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful