How to use test_run_error method in avocado

Best Python code snippet using avocado_python

sqltest.py

Source:sqltest.py Github

copy

Full Screen

1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0. If a copy of the MPL was not distributed with this3# file, You can obtain one at http://mozilla.org/MPL/2.0/.4#5# Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.6import os7import sys8import unittest9import pymonetdb10import difflib11from abc import ABCMeta, abstractmethod12import MonetDBtesting.process as process13import inspect14TSTDB=os.getenv("TSTDB")15MAPIPORT=os.getenv("MAPIPORT")16def equals(a, b) -> bool:17 if type(a) is type(b):18 return a==b19 return False20def sequence_match(left=[], right=[], index=0):21 right = right[index:]22 ll = len(left)23 rl = len(right)24 if ll > rl:25 return False26 if ll == 0 and rl > 0:27 return False28 for i in range(ll):29 if not equals(left[i], right[i]):30 return False31 return True32def get_index_mismatch(left=[], right=[]):33 ll = len(left)34 rl = len(right)35 index = None36 for i in range(min(ll, rl)):37 if not equals(left[i], right[i]):38 index = i39 break40 return index41def piped_representation(data=[]):42 def mapfn(next):43 if type(next) is tuple:44 res=[]45 for v in next:46 res.append(str(v))47 return '|'.join(res)48 else:49 raise TypeError('ERROR: expecting list of tuples!')50 res = list(map(mapfn, data))51 return '\n'.join(res)52def filter_junk(s: str):53 """filters empty strings and comments54 """55 s = s.strip()56 if s.startswith('--') or s.startswith('#') or s.startswith('stdout of test'):57 return False58 if s == '':59 return False60 return True61def filter_headers(s: str):62 """filter lines prefixed with % (MAPI headers)"""63 s = s.strip()64 if s.startswith('%'):65 return False66 if s == '':67 return False68 return True69def filter_lines_starting_with(predicates=[]):70 def _fn(line:str):71 line = line.strip()72 if line == '':73 return False74 for p in predicates:75 if line.startswith(p):76 return False77 return True78 return _fn79def filter_matching_blocks(a: [str] = [], b: [str] = []):80 # TODO add some ctx before any mismatch lines81 ptr = 082 red_a = []83 red_b = []84 min_size = min(len(a), len(b))85 s = difflib.SequenceMatcher()86 for i in range(min_size):87 s.set_seq1(a[i].replace('\t', '').replace(' ', ''))88 s.set_seq2(b[i].replace('\t', '').replace(' ', ''))89 # should be high matching ratio90 if s.quick_ratio() < 0.95:91 red_a.append(a[i])92 red_b.append(b[i])93 # keep track of last mismatch to add some ctx in between94 ptr = i95 # add trailing data if len(a) != len(b)96 red_a+=a[min_size:]97 red_b+=b[min_size:]98 return red_a, red_b99def diff(stable_file, test_file):100 diff = None101 filter_fn = filter_lines_starting_with(['--', '#', 'stdout of test', 'stderr of test', 'MAPI'])102 with open(stable_file) as fstable:103 stable = list(filter(filter_fn, fstable.read().split('\n')))104 with open(test_file) as ftest:105 test = list(filter(filter_fn, ftest.read().split('\n')))106 a, b = filter_matching_blocks(stable, test)107 diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))108 if len(diff) > 0:109 diff = '\n'.join(diff)110 else:111 diff = None112 return diff113class PyMonetDBConnectionContext(object):114 def __init__(self,115 username='monetdb', password='monetdb',116 hostname='localhost', port=MAPIPORT, database=TSTDB, language='sql'):117 self.username = username118 self.password = password119 self.hostname = hostname120 self.port = port121 self.database = database122 self.language = language123 self.dbh = None124 self.language = language125 def connect(self):126 if self.dbh is None:127 if self.language == 'sql':128 self.dbh = pymonetdb.connect(129 username=self.username,130 password=self.password,131 hostname=self.hostname,132 port=self.port,133 database=self.database,134 autocommit=True)135 else:136 self.dbh = malmapi.Connection()137 self.dbh.connect(138 username=self.username,139 password=self.password,140 hostname=self.hostname,141 port=self.port,142 database=self.database,143 language=self.language)144 return self.dbh145 def __enter__(self):146 self.connect()147 return self148 def __exit__(self, exc_type, exc_value, traceback):149 self.close()150 def cursor(self):151 if self.dbh:152 if self.language == 'sql':153 return self.dbh.cursor()154 else:155 return MapiCursor(self.dbh)156 return None157 def close(self):158 if self.dbh:159 self.dbh.close()160 self.dbh = None161class RunnableTestResult(metaclass=ABCMeta):162 """Abstract class for sql result"""163 did_run = False164 @abstractmethod165 def run(self, query:str, *args, stdin=None, lineno=None):166 """Run query with specific client"""167 pass168class TestCaseResult(object):169 """TestCase connected result"""170 test_case = None171 def __init__(self, test_case, **kwargs):172 self.test_case = test_case173 self.assertion_errors = [] # holds assertion errors174 self.query = None175 self.test_run_error = None176 self.err_code = None177 self.err_message = None178 self.data = []179 self.rows = []180 self.rowcount = -1181 self.description = None182 self.id = kwargs.get('id')183 self.lineno = None184 def fail(self, msg, data=None):185 """ logs errors to test case err file"""186 err_file = self.test_case.err_file187 if len(self.assertion_errors) == 0:188 lineno = self.lineno or 'N/A'189 print('', file=err_file)190 if self.query:191 print(f'ln{lineno}:', self.query, file=err_file)192 elif self.id:193 print(f'ln{lineno}:', self.id, file=err_file)194 print('----', file=err_file)195 self.assertion_errors.append(AssertionError(msg))196 print(msg, file=err_file)197 if data is not None:198 if len(data) < 100:199 print('query result:', file=err_file)200 else:201 print('truncated query result:', file=err_file)202 for row in data[:100]:203 sep=''204 for col in row:205 if col is None:206 print(sep, 'NULL', sep='', end='', file=err_file)207 else:208 print(sep, col, sep='', end='', file=err_file)209 sep = '|'210 print('', file=err_file)211 print('', file=err_file)212 def assertFailed(self, err_code=None, err_message=None):213 """assert on query failed"""214 if self.test_run_error is None:215 msg = "expected to fail but didn't"216 self.fail(msg)217 else:218 msgs = []219 if err_code:220 if self.err_code != err_code:221 msgs.append( "expected to fail with error code {} but failed with error code {}".format(err_code, self.err_code))222 if err_message:223 if self.err_message:224 if err_message.lower() != self.err_message.lower():225 msgs.append("expected to fail with error message '{}' but failed with error message '{}'".format(err_message, self.err_message))226 else:227 msgs.append("expected to fail with error message '{}' but got '{}'".format(err_message, self.err_message))228 if len(msgs) > 0:229 self.fail('\n'.join(msgs))230 return self231 def assertSucceeded(self):232 """assert on query succeeded"""233 if self.test_run_error is not None:234 msg = "expected to succeed but didn't\n{}".format(str(self.test_run_error).rstrip('\n'))235 self.fail(msg)236 return self237 def assertRowCount(self, rowcount):238 '''Assert on the affected row count'''239 if self.rowcount != int(rowcount):240 msg = "received {} rows, expected {} rows".format(self.rowcount, rowcount)241 self.fail(msg)242 return self243 def assertResultHashTo(self, hash_value):244 raise NotImplementedError245 def assertValue(self, row, col, val):246 """Assert on a value matched against row, col in the result"""247 received = None248 row = int(row)249 col = int(col)250 try:251 received = self.data[row][col]252 except IndexError:253 pass254 if type(val) is type(received):255 if val != received:256 msg = 'expected "{}", received "{}" in row={}, col={}'.format(val, received, row, col)257 self.fail(msg, data=self.data)258 else:259 # handle type mismatch260 msg = 'expected type {} and value "{}", received type {} and value "{}" in row={}, col={}'.format(type(val), str(val), type(received), str(received), row, col)261 self.fail(msg, data=self.data)262 return self263 def assertDataResultMatch(self, data=[], index=None):264 """Assert on a match of a subset of the result. When index is provided it265 starts comparing from that row index onward.266 """267 def mapfn(next):268 if type(next) is list:269 return tuple(next)270 return next271 if len(data) == 0 and len(self.data) > 0:272 msg = 'expected empty result!'273 self.fail(msg, data=self.data)274 if len(data) > 0 and len(self.data) == 0:275 msg = 'expected result but received empty!'276 self.fail(msg, data=self.data)277 data = list(map(mapfn, data))278 if index is None:279 if len(data) > 0:280 first = data[0]281 for i, v in enumerate(self.data):282 if first == v:283 index = i284 break285 index = index or 0286 # align sequences287 idx_mis = get_index_mismatch(data, self.data[index:])288 if idx_mis is not None:289 exp_v = data[idx_mis]290 exp_t = type(exp_v)291 res_v = self.data[idx_mis]292 res_t = type(res_v)293 msg = 'expected to match query result at index {} but it didn\'t as {} not equals {}'.format(idx_mis, repr(exp_v), repr(res_v))294 self.fail(msg, data=self.data)295 return self296class MclientTestResult(TestCaseResult, RunnableTestResult):297 """Holder of a sql execution result as returned from mclient"""298 def __init__(self, test_case, **kwargs):299 super().__init__(test_case, **kwargs)300 self.did_run = False301 def _parse_error(self, err:str):302 err_code = None303 err_message = None304 for l in err.splitlines():305 l = l.strip()306 if l.startswith('ERROR'):307 err_message = l.split('=').pop().strip()308 if l.startswith('CODE'):309 err_code = l.split('=').pop().strip()310 return err_code, err_message311 def _get_row_count(self, data):312 count = 0313 data = list(filter(filter_junk, data.splitlines()))314 for l in data:315 l = l.strip()316 if l.startswith('[') and l.endswith(']'):317 count+=1318 return count319 def run(self, query:str, *args, stdin=None, lineno=None):320 # ensure runs only once321 if self.did_run is False:322 self.lineno = lineno323 conn_ctx = self.test_case.conn_ctx324 kwargs = dict(325 host = conn_ctx.hostname,326 port = conn_ctx.port,327 dbname = conn_ctx.database,328 user = conn_ctx.username,329 passwd = conn_ctx.password)330 try:331 if query:332 self.query = query333 # TODO multiline stmts won't work here ensure single stmt334 with process.client('sql', **kwargs, \335 args=list(args), \336 stdin=process.PIPE, \337 stdout=process.PIPE, stderr=process.PIPE) as p:338 out, err = p.communicate(query)339 if out:340 self.data = out341 self.rowcount = self._get_row_count(out)342 if err:343 self.test_run_error = err344 self.err_code, self.err_message = self._parse_error(err)345 elif stdin:346 with process.client('sql', **kwargs, \347 args=list(args), \348 stdin=stdin, \349 stdout=process.PIPE, stderr=process.PIPE) as p:350 out, err = p.communicate()351 if out:352 self.data = out353 if err:354 self.test_run_error = err355 self.did_run = True356 except Exception as e:357 raise SystemExit(e)358 return self359 def assertMatchStableOut(self, fout, ignore_headers=False):360 stable = []361 data = list(filter(filter_junk, self.data.split('\n')))362 with open(fout, 'r') as f:363 stable = list(filter(filter_junk, f.read().split('\n')))364 if ignore_headers:365 stable = list(filter(filter_headers, stable))366 data = list(filter(filter_headers, data))367 a, b = filter_matching_blocks(stable, data)368 if a or b:369 diff = list(difflib.unified_diff(stable, data, fromfile='stable', tofile='test'))370 if len(diff) > 0:371 err_file = self.test_case.err_file372 msg = "expected to match stable output {} but it didnt\'t\n".format(fout)373 msg+='\n'.join(diff)374 self.assertion_errors.append(AssertionError(msg))375 self.fail(msg)376 return self377 def assertMatchStableError(self, ferr, ignore_err_messages=False):378 stable = []379 err = []380 filter_fn = filter_lines_starting_with(['--', '#', 'stderr of test', 'MAPI'])381 if self.test_run_error:382 err = list(filter(filter_fn, self.test_run_error.split('\n')))383 with open(ferr, 'r') as f:384 stable = list(filter(filter_fn, f.read().split('\n')))385 a, b = filter_matching_blocks(stable, err)386 diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))387 if len(diff) > 0:388 err_file = self.test_case.err_file389 msg = "expected to match stable error {} but it didnt\'t\n".format(ferr)390 msg+='\n'.join(diff)391 self.assertion_errors.append(AssertionError(msg))392 self.fail(msg)393 return self394 def assertDataResultMatch(self, expected):395 data = list(filter(filter_junk, self.data.split('\n')))396 data = list(filter(filter_headers, data))397 a, b = filter_matching_blocks(expected, data)398 diff = list(difflib.unified_diff(a, b, fromfile='expected', tofile='test'))399 if len(diff) > 0:400 err_file = self.test_case.err_file401 exp = '\n'.join(expected)402 got = '\n'.join(data)403 msg = "Output didn't match.\n"404 msg += f"Expected:\n{exp}\n"405 msg += f"Got:\n{got}\n"406 msg += "Diff:\n" + '\n'.join(s.rstrip() for s in diff)407 self.assertion_errors.append(AssertionError(msg))408 self.fail(msg)409 return self410 def assertValue(self, row, col, val):411 raise NotImplementedError412class PyMonetDBTestResult(TestCaseResult, RunnableTestResult):413 """Holder of sql execution information. Managed by SQLTestCase."""414 test_case = None415 def __init__(self, test_case, **kwargs):416 super().__init__(test_case, **kwargs)417 self.did_run = False418 def _parse_error(self, error:str=''):419 """Parse error string and returns (err_code, err_msg) tuple420 """421 err_code = None422 err_msg = None423 tmp = error.split('!')424 if len(tmp) > 1:425 try:426 err_code = tmp[0].strip()427 except (ValueError, TypeError):428 pass429 # reconstruct430 err_msg = ('!'.join(tmp[1:])).strip()431 elif len(tmp) == 1:432 if tmp[0]:433 err_msg = tmp[0].strip()434 return err_code, err_msg435 def run(self, query:str, *args, stdin=None, lineno=None):436 # ensure runs only once437 if self.did_run is False:438 self.lineno = lineno439 if query:440 self.query = query441 try:442 conn = self.test_case.conn_ctx.connect()443 crs = conn.cursor()444 crs.execute(query)445 self.rowcount = crs.rowcount446 self.rows = crs._rows447 if crs.description:448 self.data = crs.fetchall()449 self.description = crs.description450 except pymonetdb.Error as e:451 self.test_run_error = e452 self.err_code, self.err_message = self._parse_error(e.args[0])453 except (OSError, ValueError) as e:454 self.test_run_error = e455 self.did_run = True456 return self457class MonetDBeTestResult(TestCaseResult, RunnableTestResult):458 def __init__(self, test_case, **kwargs):459 super().__init__(test_case, **kwargs)460 self.did_run = False461 def _parse_error(self, err: str):462 pass463 def run(self, query:str, *args, stdin=None, lineno=None):464 if self.did_run is False:465 self.lineno = lineno466 if query:467 self.query = query468 try:469 conn = self.test_case.conn_ctx470 crs = conn.cursor()471 crs.execute(query)472 self.rowcount = int(crs.rowcount)473 if crs.description:474 self.description = crs.description475 self.data = crs.fetchall()476 except (monetdbe.Error, ValueError) as e:477 self.test_run_error = e478 # TODO parse error479 self.did_run = True480 return self481class SQLDump():482 def __init__(self, test_case, data=None):483 self.test_case = test_case484 self.data = data485 self.assertion_errors = [] # holds assertion errors486 def assertMatchStableOut(self, fout):487 stable = []488 data = self.data.split('\n') if self.data else []489 dump = list(filter(filter_junk, data))490 with open(fout, 'r') as f:491 stable = list(filter(filter_junk, f.read().split('\n')))492 a, b = filter_matching_blocks(stable, dump)493 diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))494 if len(diff) > 0:495 err_file = self.test_case.err_file496 msg = "sql dump expected to match stable output {} but it didnt\'t\n".format(fout)497 msg+='\n'.join(diff)498 self.assertion_errors.append(AssertionError(msg))499 print(msg, file=err_file)500class SQLTestCase():501 def __init__(self, out_file=sys.stdout, err_file=sys.stderr):502 self.out_file = out_file503 self.err_file = err_file504 self.test_results = []505 self._conn_ctx = None506 self.in_memory = False507 self.client = 'pymonetdb'508 def __enter__(self):509 return self510 def __exit__(self, exc_type, exc_value, traceback):511 self.exit()512 def close(self):513 if self._conn_ctx:514 self._conn_ctx.close()515 self._conn_ctx = None516 def exit(self):517 self.close()518 for res in self.test_results:519 if len(res.assertion_errors) > 0:520 raise SystemExit(1)521 def out(self, data):522 print(data, file=self.out_file)523 print('', file=self.out_file)524 def err(self, msg):525 print(msg, file=self.err_file)526 print('', file=self.err_file)527 def connect(self,528 username='monetdb', password='monetdb', port=MAPIPORT,529 hostname='localhost', database=TSTDB, language='sql'):530 if self._conn_ctx:531 self.close()532 if database == ':memory:':533 import monetdbe534 self.in_memory = True535 # TODO add username, password, port when supported from monetdbe536 self._conn_ctx = monetdbe.connect(':memory:', autocommit=True)537 else:538 self.in_memory = False539 self._conn_ctx = PyMonetDBConnectionContext(540 username=username,541 password=password,542 hostname=hostname,543 port=port,544 database=database,545 language=language)546 return self._conn_ctx547 def default_conn_ctx(self):548 if self.in_memory:549 return monetdbe.connect(':memory:', autocommit=True)550 ctx = PyMonetDBConnectionContext()551 return ctx552 @property553 def conn_ctx(self):554 return self._conn_ctx or self.default_conn_ctx()555 def execute(self, query:str, *args, client='pymonetdb', stdin=None, result_id=None):556 '''Execute query with specified client. Default client is pymonetbd.'''557 frame = inspect.currentframe().f_back558 lineno = frame.f_lineno559 if client == 'mclient':560 res = MclientTestResult(self, id=result_id)561 elif self.in_memory:562 res = MonetDBeTestResult(self, id=result_id)563 else:564 res = PyMonetDBTestResult(self, id=result_id)565 res.run(query, *args, stdin=stdin, lineno=lineno)566 self.test_results.append(res)567 return res568 def sqldump(self, *args):569 kwargs = dict(570 host = self.conn_ctx.hostname,571 port = self.conn_ctx.port,572 dbname = self.conn_ctx.database,573 user = self.conn_ctx.username,574 passwd = self.conn_ctx.password)575 dump = None576 if '--inserts' in args:577 args = list(args)578 cmd = 'sqldump'579 else:580 cmd = 'sql'581 # TODO should more options be allowed here582 args = ['-lsql', '-D']583 try:584 with process.client(cmd, **kwargs, args=args, stdout=process.PIPE, stderr=process.PIPE) as p:585 dump, err = p.communicate()586 except Exception as e:587 pass588 res = SQLDump(self, data=dump)589 self.test_results.append(res)590 return res591 def drop(self):592 if self.in_memory:593 # TODO594 return595 try:596 with self.conn_ctx as ctx:597 crs = ctx.cursor()598 crs.execute('select s.name, t.name, tt.table_type_name from sys.tables t, sys.schemas s, sys.table_types tt where not t.system and t.schema_id = s.id and t.type = tt.table_type_id')599 for row in crs.fetchall():600 crs.execute('drop {} "{}"."{}" cascade'.format(row[2], row[0], row[1]))601 crs.execute('select s.name, f.name, ft.function_type_keyword from functions f, schemas s, function_types ft where not f.system and f.schema_id = s.id and f.type = ft.function_type_id')602 for row in crs.fetchall():603 crs.execute('drop all {} "{}"."{}"'.format(row[2], row[0], row[1]))604 crs.execute('select s.name, q.name from sys.sequences q, schemas s where q.schema_id = s.id')605 for row in crs.fetchall():606 crs.execute('drop sequence "{}"."{}"'.format(row[0], row[1]))607 crs.execute("select name from sys.users where name not in ('monetdb', '.snapshot')")608 for row in crs.fetchall():609 crs.execute('alter user "{}" SET SCHEMA "sys"'.format(row[0]))610 crs.execute('select name from sys.schemas where not system')611 for row in crs.fetchall():612 crs.execute('drop schema "{}" cascade'.format(row[0]))613 crs.execute("select name from sys.users where name not in ('monetdb', '.snapshot')")614 for row in crs.fetchall():615 crs.execute('drop user "{}"'.format(row[0]))616 except (pymonetdb.Error, ValueError) as e:...

Full Screen

Full Screen

test_state.py

Source:test_state.py Github

copy

Full Screen

...47 def test_run_success(self, token: str, successor_state: type) -> None:48 self.assertIsInstance(self._state.run(token, self._intent), successor_state)49 @ddt.data(("invalid", IllegalTransitionError))50 @ddt.unpack51 def test_run_error(self, token: str, exception: Type[Exception]) -> None:52 self.assertRaises(exception, self._state.run, token, self._intent)53@ddt.ddt54class TestUserState(unittest.TestCase):55 def setUp(self) -> None:56 self._state = UserState()57 self._intent = Intent()58 @ddt.data("client")59 def test_run(self, user: str) -> None:60 self.assertIsInstance(self._state.run(user, self._intent), InSelectState)61 @ddt.data("client")62 def test_validate(self, user: str) -> None:63 self._state._validate(user, self._intent)64 self.assertIn(user, self._intent.users)65@ddt.ddt66class TestInSelectState(unittest.TestCase):67 def setUp(self) -> None:68 self._state = InSelectState(UserState())69 self._intent = Intent()70 @ddt.data(71 *((token, UserState) for token in {"and", ","}),72 ("in", TheState), ("select", TheBlockchainState),73 )74 @ddt.unpack75 def test_run_success(self, token: str, successor_state: type) -> None:76 self.assertIsInstance(self._state.run(token, self._intent), successor_state)77 @ddt.data(("invalid", IllegalTransitionError))78 @ddt.unpack79 def test_run_error(self, token: str, exception: Type[Exception]) -> None:80 self.assertRaises(exception, self._state.run, token, self._intent)81@ddt.ddt82class TestTheState(unittest.TestCase):83 def setUp(self) -> None:84 self._state = TheState()85 self._intent = Intent()86 @ddt.data(("the", TimeframeState))87 @ddt.unpack88 def test_run_success(self, token: str, successor_state: type) -> None:89 self.assertIsInstance(self._state.run(token, self._intent), successor_state)90 @ddt.data(("invalid", IllegalTransitionError))91 @ddt.unpack92 def test_run_error(self, token: str, exception: Type[Exception]) -> None:93 self.assertRaises(exception, self._state.run, token, self._intent)94@ddt.ddt95class TestTimeframeState(unittest.TestCase):96 def setUp(self) -> None:97 self._state = TimeframeState()98 self._intent = Intent()99 @ddt.data(*((timeframe.value, SelectState) for timeframe in Timeframe))100 @ddt.unpack101 def test_run_success(self, token: str, successor_state: type) -> None:102 self.assertIsInstance(self._state.run(token, self._intent), successor_state)103 @ddt.data(("invalid", IllegalTransitionError))104 @ddt.unpack105 def test_run_error(self, token: str, exception: Type[Exception]) -> None:106 self.assertRaises(exception, self._state.run, token, self._intent)107 @ddt.data(*Timeframe)108 def test_validate(self, timeframe: Timeframe) -> None:109 self._state._validate(timeframe.value, self._intent)110 self.assertEqual(self._intent.timeframe, timeframe)111@ddt.ddt112class TestSelectState(unittest.TestCase):113 def setUp(self) -> None:114 self._state = SelectState()115 self._intent = Intent()116 @ddt.data(("select", TheBlockchainState))117 @ddt.unpack118 def test_run_success(self, token: str, successor_state: type) -> None:119 self.assertIsInstance(self._state.run(token, self._intent), successor_state)120 @ddt.data(("invalid", IllegalTransitionError))121 @ddt.unpack122 def test_run_error(self, token: str, exception: Type[Exception]) -> None:123 self.assertRaises(exception, self._state.run, token, self._intent)124@ddt.ddt125class TestTheBlockchainState(unittest.TestCase):126 def setUp(self) -> None:127 self._state = TheBlockchainState()128 self._intent = Intent()129 @ddt.data(130 *((blockchain.value, WithUntilAsState) for blockchain in Blockchain),131 ("the", ProfileState),132 )133 @ddt.unpack134 def test_run_success(self, token: str, successor_state: type) -> None:135 self.assertIsInstance(self._state.run(token, self._intent), successor_state)136 @ddt.data(("invalid", IllegalTransitionError))137 @ddt.unpack138 def test_run_error(self, token: str, exception: Type[Exception]) -> None:139 self.assertRaises(exception, self._state.run, token, self._intent)140 @ddt.data(*Blockchain)141 def test_validate_success(self, blockchain: Blockchain) -> None:142 self._state._validate(blockchain.value, self._intent)143 self.assertEqual(self._intent.blockchain, blockchain)144 def test_validate_error(self) -> None:145 self._state._validate("invalid", self._intent)146 self.assertIsNone(self._intent.blockchain)147@ddt.ddt148class TestWithUntilUseState(unittest.TestCase):149 def setUp(self) -> None:150 self._state = WithUntilAsState()151 self._intent = Intent()152 @ddt.data(("with", ModifierState), ("until", The2State), ("as", DefaultState))153 @ddt.unpack154 def test_run_success(self, token: str, successor_state: type) -> None:155 self.assertIsInstance(self._state.run(token, self._intent), successor_state)156 @ddt.data(("invalid", IllegalTransitionError))157 @ddt.unpack158 def test_run_error(self, token: str, exception: Type[Exception]) -> None:159 self.assertRaises(exception, self._state.run, token, self._intent)160@ddt.ddt161class TestProfileState(unittest.TestCase):162 def setUp(self) -> None:163 self._state = ProfileState()164 self._intent = Intent()165 @ddt.data(*((profile.value, FilterBlockchainState) for profile in Profile))166 @ddt.unpack167 def test_run_success(self, token: str, successor_state: type) -> None:168 self.assertIsInstance(self._state.run(token, self._intent), successor_state)169 @ddt.data(("invalid", IllegalTransitionError))170 @ddt.unpack171 def test_run_error(self, token: str, exception: Type[Exception]) -> None:172 self.assertRaises(exception, self._state.run, token, self._intent)173 @ddt.data(*Profile)174 def test_validate(self, profile: Profile) -> None:175 self._state._validate(profile.value, self._intent)176 self.assertEqual(self._intent.profile, profile)177@ddt.ddt178class TestFilterStateValidator(unittest.TestCase):179 def setUp(self) -> None:180 self._validator = FilterStateValidator()181 self._intent = Intent()182 @ddt.data(*Filter)183 def test_validate_success(self, filter_: Filter) -> None:184 self._validator.validate(filter_.value, self._intent)185 self.assertIn(filter_, self._intent.filters)186 def test_validate_error(self) -> None:187 self._validator.validate("invalid", self._intent)188 self.assertSetEqual(self._intent.filters, set())189 def test_validate_private_public(self) -> None:190 self._intent.filters.add(Filter.PRIVATE)191 self.assertRaises(192 ValidationError,193 self._validator.validate,194 Filter.PUBLIC.value,195 self._intent,196 )197 @ddt.data((Profile.CHEAPEST, Filter.CHEAP), (Profile.FASTEST, Filter.FAST))198 @ddt.unpack199 def test_validate_profile_filter(self, profile: Profile, filter_: Filter) -> None:200 self._intent.profile = profile201 self._validator.validate(filter_.value, self._intent)202 self.assertNotIn(filter_, self._intent.filters)203@ddt.ddt204class TestFilterBlockchainState(unittest.TestCase):205 def setUp(self) -> None:206 self._state = FilterBlockchainState()207 self._intent = Intent()208 @ddt.data(209 *((filter_.value, BlockchainState) for filter_ in Filter),210 ("blockchain", FromExceptWithUntilAsState),211 )212 @ddt.unpack213 def test_run_success(self, token: str, successor_state: type) -> None:214 self.assertIsInstance(self._state.run(token, self._intent), successor_state)215 @ddt.data(("invalid", IllegalTransitionError))216 @ddt.unpack217 def test_run_error(self, token: str, exception: Type[Exception]) -> None:218 self.assertRaises(exception, self._state.run, token, self._intent)219@ddt.ddt220class TestFilterState(unittest.TestCase):221 def setUp(self) -> None:222 self._state = FilterState()223 self._intent = Intent()224 @ddt.data(*((filter_.value, BlockchainState) for filter_ in Filter))225 @ddt.unpack226 def test_run_success(self, token: str, successor_state: type) -> None:227 self.assertIsInstance(self._state.run(token, self._intent), successor_state)228 @ddt.data(("invalid", IllegalTransitionError))229 @ddt.unpack230 def test_run_error(self, token: str, exception: Type[Exception]) -> None:231 self.assertRaises(exception, self._state.run, token, self._intent)232@ddt.ddt233class TestBlockchainState(unittest.TestCase):234 def setUp(self) -> None:235 self._state = BlockchainState(FilterState())236 self._intent = Intent()237 @ddt.data(238 *((token, FilterState) for token in {"and", ","}),239 ("blockchain", FromExceptWithUntilAsState),240 )241 @ddt.unpack242 def test_run_success(self, token: str, successor_state: type) -> None:243 self.assertIsInstance(self._state.run(token, self._intent), successor_state)244 @ddt.data(("invalid", IllegalTransitionError))245 @ddt.unpack246 def test_run_error(self, token: str, exception: Type[Exception]) -> None:247 self.assertRaises(exception, self._state.run, token, self._intent)248@ddt.ddt249class TestFromExceptWithUntilUseState(unittest.TestCase):250 def setUp(self) -> None:251 self._state = FromExceptWithUntilAsState()252 self._intent = Intent()253 @ddt.data(254 ("from", WhitelistState),255 ("except", BlacklistState),256 ("with", ModifierState),257 ("until", The2State),258 ("as", DefaultState),259 )260 @ddt.unpack261 def test_run_success(self, token: str, successor_state: type) -> None:262 self.assertIsInstance(self._state.run(token, self._intent), successor_state)263 @ddt.data(("invalid", IllegalTransitionError))264 @ddt.unpack265 def test_run_error(self, token: str, exception: Type[Exception]) -> None:266 self.assertRaises(exception, self._state.run, token, self._intent)267@ddt.ddt268class TestWhitelistState(unittest.TestCase):269 def setUp(self) -> None:270 self._state = WhitelistState()271 self._intent = Intent()272 @ddt.data(*((blockchain.value, WhitelistWithUntilAsState) for blockchain in Blockchain))273 @ddt.unpack274 def test_run_success(self, token: str, successor_state: type) -> None:275 self.assertIsInstance(self._state.run(token, self._intent), successor_state)276 @ddt.data(("invalid", IllegalTransitionError))277 @ddt.unpack278 def test_run_error(self, token: str, exception: Type[Exception]) -> None:279 self.assertRaises(exception, self._state.run, token, self._intent)280 @ddt.data(*Blockchain)281 def test_validate(self, blockchain: Blockchain) -> None:282 self._state._validate(blockchain.value, self._intent)283 self.assertIn(blockchain, self._intent.whitelist)284@ddt.ddt285class TestWhitelistWithUntilUseState(unittest.TestCase):286 def setUp(self) -> None:287 self._state = WhitelistWithUntilAsState(WhitelistState())288 self._intent = Intent()289 @ddt.data(290 *((token, WhitelistState) for token in {"and", ","}),291 ("with", ModifierState),292 ("until", The2State),293 ("as", DefaultState),294 )295 @ddt.unpack296 def test_run_success(self, token: str, successor_state: type) -> None:297 self.assertIsInstance(self._state.run(token, self._intent), successor_state)298 @ddt.data(("invalid", IllegalTransitionError))299 @ddt.unpack300 def test_run_error(self, token: str, exception: Type[Exception]) -> None:301 self.assertRaises(exception, self._state.run, token, self._intent)302@ddt.ddt303class TestBlacklistState(unittest.TestCase):304 def setUp(self) -> None:305 self._state = BlacklistState()306 self._intent = Intent()307 @ddt.data(*((blockchain.value, BlacklistWithUntilAsState) for blockchain in Blockchain))308 @ddt.unpack309 def test_run_success(self, token: str, successor_state: type) -> None:310 self.assertIsInstance(self._state.run(token, self._intent), successor_state)311 @ddt.data(("invalid", IllegalTransitionError))312 @ddt.unpack313 def test_run_error(self, token: str, exception: Type[Exception]) -> None:314 self.assertRaises(exception, self._state.run, token, self._intent)315 @ddt.data(*Blockchain)316 def test_validate(self, blockchain: Blockchain) -> None:317 self._state._validate(blockchain.value, self._intent)318 self.assertIn(blockchain, self._intent.blacklist)319@ddt.ddt320class TestBlacklistWithUntilAsState(unittest.TestCase):321 def setUp(self) -> None:322 self._state = BlacklistWithUntilAsState(BlacklistState())323 self._intent = Intent()324 @ddt.data(325 *((token, BlacklistState) for token in {"and", ","}),326 ("with", ModifierState),327 ("until", The2State),328 ("as", DefaultState),329 )330 @ddt.unpack331 def test_run_success(self, token: str, successor_state: type) -> None:332 self.assertIsInstance(self._state.run(token, self._intent), successor_state)333 @ddt.data(("invalid", IllegalTransitionError))334 @ddt.unpack335 def test_run_error(self, token: str, exception: Type[Exception]) -> None:336 self.assertRaises(exception, self._state.run, token, self._intent)337@ddt.ddt338class TestModifierState(unittest.TestCase):339 def setUp(self) -> None:340 self._state = ModifierState()341 self._intent = Intent()342 @ddt.data(*((modifier.value, UntilAsState) for modifier in Modifier))343 @ddt.unpack344 def test_run_success(self, token: str, successor_state: type) -> None:345 self.assertIsInstance(self._state.run(token, self._intent), successor_state)346 @ddt.data(("invalid", IllegalTransitionError))347 @ddt.unpack348 def test_run_error(self, token: str, exception: Type[Exception]) -> None:349 self.assertRaises(exception, self._state.run, token, self._intent)350 @ddt.data(*Modifier)351 def test_validate(self, modifier: Modifier) -> None:352 self._state._validate(modifier.value, self._intent)353 self.assertIn(modifier, self._intent.modifiers)354@ddt.ddt355class TestUntilAsState(unittest.TestCase):356 def setUp(self) -> None:357 self._state = UntilAsState(ModifierState())358 self._intent = Intent()359 @ddt.data(360 *((token, ModifierState) for token in {"and", ","}),361 ("until", The2State),362 ("as", DefaultState),363 )364 @ddt.unpack365 def test_run_success(self, token: str, successor_state: type) -> None:366 self.assertIsInstance(self._state.run(token, self._intent), successor_state)367 @ddt.data(("invalid", IllegalTransitionError))368 @ddt.unpack369 def test_run_error(self, token: str, exception: Type[Exception]) -> None:370 self.assertRaises(exception, self._state.run, token, self._intent)371@ddt.ddt372class TestThe2State(unittest.TestCase):373 def setUp(self) -> None:374 self._state = The2State()375 self._intent = Intent()376 @ddt.data(("the", IntervalState))377 @ddt.unpack378 def test_run_success(self, token: str, successor_state: type) -> None:379 self.assertIsInstance(self._state.run(token, self._intent), successor_state)380 @ddt.data(("invalid", IllegalTransitionError))381 @ddt.unpack382 def test_run_error(self, token: str, exception: Type[Exception]) -> None:383 self.assertRaises(exception, self._state.run, token, self._intent)384@ddt.ddt385class TestIntervalState(unittest.TestCase):386 def setUp(self) -> None:387 self._state = IntervalState()388 self._intent = Intent()389 @ddt.data(*((interval.value, CostsState) for interval in Interval))390 @ddt.unpack391 def test_run_success(self, token: str, successor_state: type) -> None:392 self.assertIsInstance(self._state.run(token, self._intent), successor_state)393 @ddt.data(("invalid", IllegalTransitionError))394 @ddt.unpack395 def test_run_error(self, token: str, exception: Type[Exception]) -> None:396 self.assertRaises(exception, self._state.run, token, self._intent)397 @ddt.data(*Interval)398 def test_validate(self, interval: Interval) -> None:399 self._state._validate(interval.value, self._intent)400 self.assertEqual(self._intent.interval, interval)401@ddt.ddt402class TestCostsState(unittest.TestCase):403 def setUp(self) -> None:404 self._state = CostsState()405 self._intent = Intent()406 @ddt.data(("costs", ReachState))407 @ddt.unpack408 def test_run_success(self, token: str, successor_state: type) -> None:409 self.assertIsInstance(self._state.run(token, self._intent), successor_state)410 @ddt.data(("invalid", IllegalTransitionError))411 @ddt.unpack412 def test_run_error(self, token: str, exception: Type[Exception]) -> None:413 self.assertRaises(exception, self._state.run, token, self._intent)414@ddt.ddt415class TestReachState(unittest.TestCase):416 def setUp(self) -> None:417 self._state = ReachState()418 self._intent = Intent()419 @ddt.data(("reach", CurrencyThresholdState))420 @ddt.unpack421 def test_run_success(self, token: str, successor_state: type) -> None:422 self.assertIsInstance(self._state.run(token, self._intent), successor_state)423 @ddt.data(("invalid", IllegalTransitionError))424 @ddt.unpack425 def test_run_error(self, token: str, exception: Type[Exception]) -> None:426 self.assertRaises(exception, self._state.run, token, self._intent)427@ddt.ddt428class TestCurrencyThresholdState(unittest.TestCase):429 def setUp(self) -> None:430 self._state = CurrencyThresholdState()431 self._intent = Intent()432 @ddt.data(433 *((currency.value, ThresholdState) for currency in Currency),434 ("47.2", PolicyState),435 )436 @ddt.unpack437 def test_run_success(self, token: str, successor_state: type) -> None:438 self.assertIsInstance(self._state.run(token, self._intent), successor_state)439 @ddt.data(("invalid", ValidationError))440 @ddt.unpack441 def test_run_error(self, token: str, exception: Type[Exception]) -> None:442 self.assertRaises(exception, self._state.run, token, self._intent)443 @ddt.data(*Currency)444 def test_validate(self, currency: Currency) -> None:445 self._state._validate(currency.value, self._intent)446 self.assertEqual(self._intent.currency, currency)447@ddt.ddt448class TestThresholdState(unittest.TestCase):449 def setUp(self) -> None:450 self._state = ThresholdState()451 self._intent = Intent()452 @ddt.data(("35.7", PolicyState))453 @ddt.unpack454 def test_run(self, token: str, successor_state: type) -> None:455 self.assertIsInstance(self._state.run(token, self._intent), successor_state)456 @ddt.data(("invalid", ValidationError))457 @ddt.unpack458 def test_run_error(self, token: str, exception: Type[Exception]) -> None:459 self.assertRaises(exception, self._state.run, token, self._intent)460 @ddt.data(15.7)461 def test_validate(self, threshold: float) -> None:462 self._state._validate(str(threshold), self._intent)463 self.assertEqual(self._intent.threshold, threshold)464 def test_validate_error(self) -> None:465 self.assertRaises(ValidationError, self._state._validate, "invalid", self._intent)466@ddt.ddt467class TestPolicyState(unittest.TestCase):468 def setUp(self) -> None:469 self._state = PolicyState()470 self._intent = Intent()471 @ddt.data("random")472 def test_run(self, token: str) -> None:473 self.assertRaises(IllegalTransitionError, self._state.run, token, self._intent)474@ddt.ddt475class TestDefaultState(unittest.TestCase):476 def setUp(self) -> None:477 self._state = DefaultState()478 self._intent = Intent()479 @ddt.data(("default", DefaultPolicyState))480 @ddt.unpack481 def test_run_success(self, token: str, successor_state: type) -> None:482 self.assertIsInstance(self._state.run(token, self._intent), successor_state)483 @ddt.data(("invalid", IllegalTransitionError))484 @ddt.unpack485 def test_run_error(self, token: str, exception: Type[Exception]) -> None:486 self.assertRaises(exception, self._state.run, token, self._intent)487 @ddt.data(*Timeframe)488 def test_validate(self, timeframe: Timeframe) -> None:489 self._intent.timeframe = timeframe490 self._state._validate("default", self._intent)491 self.assertIsNone(self._intent.timeframe)492 self.assertIsNone(self._intent.interval)493@ddt.ddt494class TestDefaultPolicyState(unittest.TestCase):495 def setUp(self) -> None:496 self._state = DefaultPolicyState()497 self._intent = Intent()498 @ddt.data("random")499 def test_run(self, token: str) -> None:...

Full Screen

Full Screen

test_cli.py

Source:test_cli.py Github

copy

Full Screen

1import pytest2from sweeper.sync import run3def test_cli(config_file, db):4 """Dummy test config and backend to check that loading works"""5 run("test", config=config_file)6 assert db["metadata"].count(job="test") == 17 job = db["metadata"].find_one(job="test")8 assert job["error"] is None9 _run = db["test"].find_one()10 assert _run["metadata_id"] == job["id"]11def test_cli_error(config_file, db):12 with pytest.raises(Exception):13 run("test_error", config=config_file)14 assert db["metadata"].count(job="test_error") == 115 assert db["metadata"].find_one(job="test_error")["error"] == "ERROR"16def test_cli_run_error(config_file, db):17 run("test_run_error", config=config_file)18 assert db["metadata"].count(job="test_run_error") == 119 job = db["metadata"].find_one(job="test_run_error")20 assert job["has_run_errors"]21 assert db["test_run_error"].count() == 122 _run = db["test_run_error"].find_one()23 assert _run["error"] == "ERROR"24 assert _run["name"] == "dumdum"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful