Best Python code snippet using avocado_python
sqltest.py
Source:sqltest.py  
1# This Source Code Form is subject to the terms of the Mozilla Public2# License, v. 2.0.  If a copy of the MPL was not distributed with this3# file, You can obtain one at http://mozilla.org/MPL/2.0/.4#5# Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.6import os7import sys8import unittest9import pymonetdb10import difflib11from abc import ABCMeta, abstractmethod12import MonetDBtesting.process as process13import inspect14TSTDB=os.getenv("TSTDB")15MAPIPORT=os.getenv("MAPIPORT")16def equals(a, b) -> bool:17    if type(a) is type(b):18        return a==b19    return False20def sequence_match(left=[], right=[], index=0):21    right = right[index:]22    ll = len(left)23    rl = len(right)24    if ll > rl:25        return False26    if ll == 0 and rl > 0:27        return False28    for i in range(ll):29        if not equals(left[i], right[i]):30            return False31    return True32def get_index_mismatch(left=[], right=[]):33    ll = len(left)34    rl = len(right)35    index = None36    for i in range(min(ll, rl)):37        if not equals(left[i], right[i]):38            index = i39            break40    return index41def piped_representation(data=[]):42    def mapfn(next):43        if type(next) is tuple:44            res=[]45            for v in next:46                res.append(str(v))47            return '|'.join(res)48        else:49            raise TypeError('ERROR: expecting list of tuples!')50    res = list(map(mapfn, data))51    return '\n'.join(res)52def filter_junk(s: str):53    """filters empty strings and comments54    """55    s = s.strip()56    if s.startswith('--') or s.startswith('#') or s.startswith('stdout of test'):57        return False58    if s == '':59        return False60    return True61def filter_headers(s: str):62    """filter lines prefixed with % (MAPI headers)"""63    s = s.strip()64    if s.startswith('%'):65        return False66    if s == '':67        return False68    return True69def filter_lines_starting_with(predicates=[]):70    def _fn(line:str):71        line = line.strip()72        if line == '':73            return False74        for p in predicates:75            if line.startswith(p):76                return False77        return True78    return _fn79def filter_matching_blocks(a: [str] = [], b: [str] = []):80    # TODO add some ctx before any mismatch lines81    ptr = 082    red_a = []83    red_b = []84    min_size = min(len(a), len(b))85    s = difflib.SequenceMatcher()86    for i in range(min_size):87        s.set_seq1(a[i].replace('\t', '').replace(' ', ''))88        s.set_seq2(b[i].replace('\t', '').replace(' ', ''))89        # should be high matching ratio90        if s.quick_ratio() < 0.95:91            red_a.append(a[i])92            red_b.append(b[i])93            # keep track of last mismatch to add some ctx in between94            ptr = i95    # add trailing data if len(a) != len(b)96    red_a+=a[min_size:]97    red_b+=b[min_size:]98    return red_a, red_b99def diff(stable_file, test_file):100    diff = None101    filter_fn = filter_lines_starting_with(['--', '#', 'stdout of test', 'stderr of test', 'MAPI'])102    with open(stable_file) as fstable:103        stable = list(filter(filter_fn, fstable.read().split('\n')))104        with open(test_file) as ftest:105            test = list(filter(filter_fn, ftest.read().split('\n')))106            a, b = filter_matching_blocks(stable, test)107            diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))108            if len(diff) > 0:109                diff = '\n'.join(diff)110            else:111                diff = None112    return diff113class PyMonetDBConnectionContext(object):114    def __init__(self,115            username='monetdb', password='monetdb',116            hostname='localhost', port=MAPIPORT, database=TSTDB, language='sql'):117        self.username = username118        self.password = password119        self.hostname = hostname120        self.port = port121        self.database = database122        self.language = language123        self.dbh = None124        self.language = language125    def connect(self):126        if self.dbh is None:127            if self.language == 'sql':128                self.dbh = pymonetdb.connect(129                                         username=self.username,130                                         password=self.password,131                                         hostname=self.hostname,132                                         port=self.port,133                                         database=self.database,134                                         autocommit=True)135            else:136                self.dbh = malmapi.Connection()137                self.dbh.connect(138                                 username=self.username,139                                 password=self.password,140                                 hostname=self.hostname,141                                 port=self.port,142                                 database=self.database,143                                 language=self.language)144        return self.dbh145    def __enter__(self):146        self.connect()147        return self148    def __exit__(self, exc_type, exc_value, traceback):149        self.close()150    def cursor(self):151        if self.dbh:152            if self.language == 'sql':153                return self.dbh.cursor()154            else:155                return MapiCursor(self.dbh)156        return None157    def close(self):158        if self.dbh:159            self.dbh.close()160            self.dbh = None161class RunnableTestResult(metaclass=ABCMeta):162    """Abstract class for sql result"""163    did_run = False164    @abstractmethod165    def run(self, query:str, *args, stdin=None, lineno=None):166        """Run query with specific client"""167        pass168class TestCaseResult(object):169    """TestCase connected result"""170    test_case = None171    def __init__(self, test_case, **kwargs):172        self.test_case = test_case173        self.assertion_errors = [] # holds assertion errors174        self.query = None175        self.test_run_error = None176        self.err_code = None177        self.err_message = None178        self.data = []179        self.rows = []180        self.rowcount = -1181        self.description = None182        self.id = kwargs.get('id')183        self.lineno = None184    def fail(self, msg, data=None):185        """ logs errors to test case err file"""186        err_file = self.test_case.err_file187        if len(self.assertion_errors) == 0:188            lineno = self.lineno or 'N/A'189            print('', file=err_file)190            if self.query:191                print(f'ln{lineno}:', self.query, file=err_file)192            elif self.id:193                print(f'ln{lineno}:', self.id, file=err_file)194            print('----', file=err_file)195        self.assertion_errors.append(AssertionError(msg))196        print(msg, file=err_file)197        if data is not None:198            if len(data) < 100:199                print('query result:', file=err_file)200            else:201                print('truncated query result:', file=err_file)202            for row in data[:100]:203                sep=''204                for col in row:205                    if col is None:206                        print(sep, 'NULL', sep='', end='', file=err_file)207                    else:208                        print(sep, col, sep='', end='', file=err_file)209                    sep = '|'210                print('', file=err_file)211            print('', file=err_file)212    def assertFailed(self, err_code=None, err_message=None):213        """assert on query failed"""214        if self.test_run_error is None:215            msg = "expected to fail but didn't"216            self.fail(msg)217        else:218            msgs = []219            if err_code:220                if self.err_code != err_code:221                    msgs.append( "expected to fail with error code {} but failed with error code {}".format(err_code, self.err_code))222            if err_message:223                if self.err_message:224                    if err_message.lower() != self.err_message.lower():225                        msgs.append("expected to fail with error message '{}' but failed with error message '{}'".format(err_message, self.err_message))226                else:227                    msgs.append("expected to fail with error message '{}' but got '{}'".format(err_message, self.err_message))228            if len(msgs) > 0:229                self.fail('\n'.join(msgs))230        return self231    def assertSucceeded(self):232        """assert on query succeeded"""233        if self.test_run_error is not None:234            msg = "expected to succeed but didn't\n{}".format(str(self.test_run_error).rstrip('\n'))235            self.fail(msg)236        return self237    def assertRowCount(self, rowcount):238        '''Assert on the affected row count'''239        if self.rowcount != int(rowcount):240            msg = "received {} rows, expected {} rows".format(self.rowcount, rowcount)241            self.fail(msg)242        return self243    def assertResultHashTo(self, hash_value):244        raise NotImplementedError245    def assertValue(self, row, col, val):246        """Assert on a value matched against row, col in the result"""247        received = None248        row = int(row)249        col = int(col)250        try:251            received = self.data[row][col]252        except IndexError:253            pass254        if type(val) is type(received):255            if val != received:256                msg = 'expected "{}", received "{}" in row={}, col={}'.format(val, received, row, col)257                self.fail(msg, data=self.data)258        else:259            # handle type mismatch260            msg = 'expected type {} and value "{}", received type {} and value "{}" in row={}, col={}'.format(type(val), str(val), type(received), str(received), row, col)261            self.fail(msg, data=self.data)262        return self263    def assertDataResultMatch(self, data=[], index=None):264        """Assert on a match of a subset of the result. When index is provided it265        starts comparing from that row index onward.266        """267        def mapfn(next):268            if type(next) is list:269                return tuple(next)270            return next271        if len(data) == 0 and len(self.data) > 0:272            msg = 'expected empty result!'273            self.fail(msg, data=self.data)274        if len(data) > 0 and len(self.data) == 0:275            msg = 'expected result but received empty!'276            self.fail(msg, data=self.data)277        data = list(map(mapfn, data))278        if index is None:279            if len(data) > 0:280                first = data[0]281                for i, v in enumerate(self.data):282                    if first == v:283                        index = i284                        break285        index = index or 0286        # align sequences287        idx_mis = get_index_mismatch(data, self.data[index:])288        if idx_mis is not None:289            exp_v = data[idx_mis]290            exp_t = type(exp_v)291            res_v = self.data[idx_mis]292            res_t = type(res_v)293            msg = 'expected to match query result at index {} but it didn\'t as {} not equals {}'.format(idx_mis, repr(exp_v), repr(res_v))294            self.fail(msg, data=self.data)295        return self296class MclientTestResult(TestCaseResult, RunnableTestResult):297    """Holder of a sql execution result as returned from mclient"""298    def __init__(self, test_case, **kwargs):299        super().__init__(test_case, **kwargs)300        self.did_run = False301    def _parse_error(self, err:str):302        err_code = None303        err_message = None304        for l in err.splitlines():305            l = l.strip()306            if l.startswith('ERROR'):307                err_message = l.split('=').pop().strip()308            if l.startswith('CODE'):309                err_code = l.split('=').pop().strip()310        return err_code, err_message311    def _get_row_count(self, data):312        count = 0313        data = list(filter(filter_junk, data.splitlines()))314        for l in data:315            l = l.strip()316            if l.startswith('[') and l.endswith(']'):317                count+=1318        return count319    def run(self, query:str, *args, stdin=None, lineno=None):320        # ensure runs only once321        if self.did_run is False:322            self.lineno = lineno323            conn_ctx = self.test_case.conn_ctx324            kwargs = dict(325                host = conn_ctx.hostname,326                port = conn_ctx.port,327                dbname = conn_ctx.database,328                user = conn_ctx.username,329                passwd = conn_ctx.password)330            try:331                if query:332                    self.query = query333                    # TODO multiline stmts won't work here ensure single stmt334                    with process.client('sql', **kwargs, \335                            args=list(args), \336                            stdin=process.PIPE, \337                            stdout=process.PIPE, stderr=process.PIPE) as p:338                        out, err = p.communicate(query)339                        if out:340                            self.data = out341                            self.rowcount = self._get_row_count(out)342                        if err:343                            self.test_run_error = err344                            self.err_code, self.err_message = self._parse_error(err)345                elif stdin:346                    with process.client('sql', **kwargs, \347                            args=list(args), \348                            stdin=stdin, \349                            stdout=process.PIPE, stderr=process.PIPE) as p:350                        out, err = p.communicate()351                        if out:352                            self.data = out353                        if err:354                            self.test_run_error = err355                self.did_run = True356            except Exception as e:357                raise SystemExit(e)358        return self359    def assertMatchStableOut(self, fout, ignore_headers=False):360        stable = []361        data = list(filter(filter_junk, self.data.split('\n')))362        with open(fout, 'r') as f:363            stable = list(filter(filter_junk, f.read().split('\n')))364        if ignore_headers:365            stable = list(filter(filter_headers, stable))366            data = list(filter(filter_headers, data))367        a, b = filter_matching_blocks(stable, data)368        if a or b:369            diff = list(difflib.unified_diff(stable, data, fromfile='stable', tofile='test'))370            if len(diff) > 0:371                err_file = self.test_case.err_file372                msg = "expected to match stable output {} but it didnt\'t\n".format(fout)373                msg+='\n'.join(diff)374                self.assertion_errors.append(AssertionError(msg))375                self.fail(msg)376        return self377    def assertMatchStableError(self, ferr, ignore_err_messages=False):378        stable = []379        err = []380        filter_fn = filter_lines_starting_with(['--', '#', 'stderr of test', 'MAPI'])381        if self.test_run_error:382            err = list(filter(filter_fn, self.test_run_error.split('\n')))383        with open(ferr, 'r') as f:384            stable = list(filter(filter_fn, f.read().split('\n')))385        a, b = filter_matching_blocks(stable, err)386        diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))387        if len(diff) > 0:388            err_file = self.test_case.err_file389            msg = "expected to match stable error {} but it didnt\'t\n".format(ferr)390            msg+='\n'.join(diff)391            self.assertion_errors.append(AssertionError(msg))392            self.fail(msg)393        return self394    def assertDataResultMatch(self, expected):395        data = list(filter(filter_junk, self.data.split('\n')))396        data = list(filter(filter_headers, data))397        a, b = filter_matching_blocks(expected, data)398        diff = list(difflib.unified_diff(a, b, fromfile='expected', tofile='test'))399        if len(diff) > 0:400            err_file = self.test_case.err_file401            exp = '\n'.join(expected)402            got = '\n'.join(data)403            msg = "Output didn't match.\n"404            msg += f"Expected:\n{exp}\n"405            msg += f"Got:\n{got}\n"406            msg += "Diff:\n" + '\n'.join(s.rstrip() for s in diff)407            self.assertion_errors.append(AssertionError(msg))408            self.fail(msg)409        return self410    def assertValue(self, row, col, val):411        raise NotImplementedError412class PyMonetDBTestResult(TestCaseResult, RunnableTestResult):413    """Holder of sql execution information. Managed by SQLTestCase."""414    test_case = None415    def __init__(self, test_case, **kwargs):416        super().__init__(test_case, **kwargs)417        self.did_run = False418    def _parse_error(self, error:str=''):419        """Parse error string and returns (err_code, err_msg) tuple420        """421        err_code = None422        err_msg = None423        tmp = error.split('!')424        if len(tmp) > 1:425            try:426                err_code = tmp[0].strip()427            except (ValueError, TypeError):428                pass429            # reconstruct430            err_msg = ('!'.join(tmp[1:])).strip()431        elif len(tmp) == 1:432            if tmp[0]:433                err_msg = tmp[0].strip()434        return err_code, err_msg435    def run(self, query:str, *args, stdin=None, lineno=None):436        # ensure runs only once437        if self.did_run is False:438            self.lineno = lineno439            if query:440                self.query = query441                try:442                    conn = self.test_case.conn_ctx.connect()443                    crs = conn.cursor()444                    crs.execute(query)445                    self.rowcount = crs.rowcount446                    self.rows = crs._rows447                    if crs.description:448                        self.data = crs.fetchall()449                        self.description = crs.description450                except pymonetdb.Error as e:451                    self.test_run_error = e452                    self.err_code, self.err_message = self._parse_error(e.args[0])453                except (OSError, ValueError) as e:454                    self.test_run_error = e455            self.did_run = True456        return self457class MonetDBeTestResult(TestCaseResult, RunnableTestResult):458    def __init__(self, test_case, **kwargs):459        super().__init__(test_case, **kwargs)460        self.did_run = False461    def _parse_error(self, err: str):462        pass463    def run(self, query:str, *args, stdin=None, lineno=None):464        if self.did_run is False:465            self.lineno = lineno466            if query:467                self.query = query468                try:469                    conn = self.test_case.conn_ctx470                    crs = conn.cursor()471                    crs.execute(query)472                    self.rowcount = int(crs.rowcount)473                    if crs.description:474                        self.description = crs.description475                        self.data = crs.fetchall()476                except (monetdbe.Error, ValueError) as e:477                    self.test_run_error = e478                    # TODO parse error479            self.did_run = True480        return self481class SQLDump():482    def __init__(self, test_case, data=None):483        self.test_case = test_case484        self.data = data485        self.assertion_errors = [] # holds assertion errors486    def assertMatchStableOut(self, fout):487        stable = []488        data = self.data.split('\n') if self.data else []489        dump = list(filter(filter_junk, data))490        with open(fout, 'r') as f:491            stable = list(filter(filter_junk, f.read().split('\n')))492        a, b = filter_matching_blocks(stable, dump)493        diff = list(difflib.unified_diff(a, b, fromfile='stable', tofile='test'))494        if len(diff) > 0:495            err_file = self.test_case.err_file496            msg = "sql dump expected to match stable output {} but it didnt\'t\n".format(fout)497            msg+='\n'.join(diff)498            self.assertion_errors.append(AssertionError(msg))499            print(msg, file=err_file)500class SQLTestCase():501    def __init__(self, out_file=sys.stdout, err_file=sys.stderr):502        self.out_file = out_file503        self.err_file = err_file504        self.test_results = []505        self._conn_ctx = None506        self.in_memory = False507        self.client = 'pymonetdb'508    def __enter__(self):509        return self510    def __exit__(self, exc_type, exc_value, traceback):511        self.exit()512    def close(self):513        if self._conn_ctx:514            self._conn_ctx.close()515        self._conn_ctx = None516    def exit(self):517        self.close()518        for res in self.test_results:519            if len(res.assertion_errors) > 0:520                raise SystemExit(1)521    def out(self, data):522        print(data, file=self.out_file)523        print('', file=self.out_file)524    def err(self, msg):525        print(msg, file=self.err_file)526        print('', file=self.err_file)527    def connect(self,528            username='monetdb', password='monetdb', port=MAPIPORT,529            hostname='localhost', database=TSTDB, language='sql'):530        if self._conn_ctx:531            self.close()532        if database == ':memory:':533            import monetdbe534            self.in_memory = True535            # TODO add username, password, port when supported from monetdbe536            self._conn_ctx = monetdbe.connect(':memory:', autocommit=True)537        else:538            self.in_memory = False539            self._conn_ctx = PyMonetDBConnectionContext(540                                 username=username,541                                 password=password,542                                 hostname=hostname,543                                 port=port,544                                 database=database,545                                 language=language)546        return self._conn_ctx547    def default_conn_ctx(self):548        if self.in_memory:549            return  monetdbe.connect(':memory:', autocommit=True)550        ctx = PyMonetDBConnectionContext()551        return ctx552    @property553    def conn_ctx(self):554        return self._conn_ctx or self.default_conn_ctx()555    def execute(self, query:str, *args, client='pymonetdb', stdin=None, result_id=None):556        '''Execute query with specified client. Default client is pymonetbd.'''557        frame = inspect.currentframe().f_back558        lineno = frame.f_lineno559        if client == 'mclient':560            res = MclientTestResult(self, id=result_id)561        elif self.in_memory:562            res = MonetDBeTestResult(self, id=result_id)563        else:564            res = PyMonetDBTestResult(self, id=result_id)565        res.run(query, *args, stdin=stdin, lineno=lineno)566        self.test_results.append(res)567        return res568    def sqldump(self, *args):569        kwargs = dict(570            host = self.conn_ctx.hostname,571            port = self.conn_ctx.port,572            dbname = self.conn_ctx.database,573            user = self.conn_ctx.username,574            passwd = self.conn_ctx.password)575        dump = None576        if '--inserts' in args:577            args = list(args)578            cmd = 'sqldump'579        else:580            cmd = 'sql'581            # TODO should more options be allowed here582            args = ['-lsql', '-D']583        try:584            with process.client(cmd, **kwargs, args=args, stdout=process.PIPE, stderr=process.PIPE) as p:585                dump, err = p.communicate()586        except Exception as e:587            pass588        res = SQLDump(self, data=dump)589        self.test_results.append(res)590        return res591    def drop(self):592        if self.in_memory:593            # TODO594            return595        try:596            with self.conn_ctx as ctx:597                crs = ctx.cursor()598                crs.execute('select s.name, t.name, tt.table_type_name from sys.tables t, sys.schemas s, sys.table_types tt where not t.system and t.schema_id = s.id and t.type = tt.table_type_id')599                for row in crs.fetchall():600                    crs.execute('drop {} "{}"."{}" cascade'.format(row[2], row[0], row[1]))601                crs.execute('select s.name, f.name, ft.function_type_keyword from functions f, schemas s, function_types ft where not f.system and f.schema_id = s.id and f.type = ft.function_type_id')602                for row in crs.fetchall():603                    crs.execute('drop all {} "{}"."{}"'.format(row[2], row[0], row[1]))604                crs.execute('select s.name, q.name from sys.sequences q, schemas s where q.schema_id = s.id')605                for row in crs.fetchall():606                    crs.execute('drop sequence "{}"."{}"'.format(row[0], row[1]))607                crs.execute("select name from sys.users where name not in ('monetdb', '.snapshot')")608                for row in crs.fetchall():609                    crs.execute('alter user "{}" SET SCHEMA "sys"'.format(row[0]))610                crs.execute('select name from sys.schemas where not system')611                for row in crs.fetchall():612                    crs.execute('drop schema "{}" cascade'.format(row[0]))613                crs.execute("select name from sys.users where name not in ('monetdb', '.snapshot')")614                for row in crs.fetchall():615                    crs.execute('drop user "{}"'.format(row[0]))616        except (pymonetdb.Error, ValueError) as e:...test_state.py
Source:test_state.py  
...47    def test_run_success(self, token: str, successor_state: type) -> None:48        self.assertIsInstance(self._state.run(token, self._intent), successor_state)49    @ddt.data(("invalid", IllegalTransitionError))50    @ddt.unpack51    def test_run_error(self, token: str, exception: Type[Exception]) -> None:52        self.assertRaises(exception, self._state.run, token, self._intent)53@ddt.ddt54class TestUserState(unittest.TestCase):55    def setUp(self) -> None:56        self._state = UserState()57        self._intent = Intent()58    @ddt.data("client")59    def test_run(self, user: str) -> None:60        self.assertIsInstance(self._state.run(user, self._intent), InSelectState)61    @ddt.data("client")62    def test_validate(self, user: str) -> None:63        self._state._validate(user, self._intent)64        self.assertIn(user, self._intent.users)65@ddt.ddt66class TestInSelectState(unittest.TestCase):67    def setUp(self) -> None:68        self._state = InSelectState(UserState())69        self._intent = Intent()70    @ddt.data(71        *((token, UserState) for token in {"and", ","}),72        ("in", TheState), ("select", TheBlockchainState),73    )74    @ddt.unpack75    def test_run_success(self, token: str, successor_state: type) -> None:76        self.assertIsInstance(self._state.run(token, self._intent), successor_state)77    @ddt.data(("invalid", IllegalTransitionError))78    @ddt.unpack79    def test_run_error(self, token: str, exception: Type[Exception]) -> None:80        self.assertRaises(exception, self._state.run, token, self._intent)81@ddt.ddt82class TestTheState(unittest.TestCase):83    def setUp(self) -> None:84        self._state = TheState()85        self._intent = Intent()86    @ddt.data(("the", TimeframeState))87    @ddt.unpack88    def test_run_success(self, token: str, successor_state: type) -> None:89        self.assertIsInstance(self._state.run(token, self._intent), successor_state)90    @ddt.data(("invalid", IllegalTransitionError))91    @ddt.unpack92    def test_run_error(self, token: str, exception: Type[Exception]) -> None:93        self.assertRaises(exception, self._state.run, token, self._intent)94@ddt.ddt95class TestTimeframeState(unittest.TestCase):96    def setUp(self) -> None:97        self._state = TimeframeState()98        self._intent = Intent()99    @ddt.data(*((timeframe.value, SelectState) for timeframe in Timeframe))100    @ddt.unpack101    def test_run_success(self, token: str, successor_state: type) -> None:102        self.assertIsInstance(self._state.run(token, self._intent), successor_state)103    @ddt.data(("invalid", IllegalTransitionError))104    @ddt.unpack105    def test_run_error(self, token: str, exception: Type[Exception]) -> None:106        self.assertRaises(exception, self._state.run, token, self._intent)107    @ddt.data(*Timeframe)108    def test_validate(self, timeframe: Timeframe) -> None:109        self._state._validate(timeframe.value, self._intent)110        self.assertEqual(self._intent.timeframe, timeframe)111@ddt.ddt112class TestSelectState(unittest.TestCase):113    def setUp(self) -> None:114        self._state = SelectState()115        self._intent = Intent()116    @ddt.data(("select", TheBlockchainState))117    @ddt.unpack118    def test_run_success(self, token: str, successor_state: type) -> None:119        self.assertIsInstance(self._state.run(token, self._intent), successor_state)120    @ddt.data(("invalid", IllegalTransitionError))121    @ddt.unpack122    def test_run_error(self, token: str, exception: Type[Exception]) -> None:123        self.assertRaises(exception, self._state.run, token, self._intent)124@ddt.ddt125class TestTheBlockchainState(unittest.TestCase):126    def setUp(self) -> None:127        self._state = TheBlockchainState()128        self._intent = Intent()129    @ddt.data(130        *((blockchain.value, WithUntilAsState) for blockchain in Blockchain),131        ("the", ProfileState),132    )133    @ddt.unpack134    def test_run_success(self, token: str, successor_state: type) -> None:135        self.assertIsInstance(self._state.run(token, self._intent), successor_state)136    @ddt.data(("invalid", IllegalTransitionError))137    @ddt.unpack138    def test_run_error(self, token: str, exception: Type[Exception]) -> None:139        self.assertRaises(exception, self._state.run, token, self._intent)140    @ddt.data(*Blockchain)141    def test_validate_success(self, blockchain: Blockchain) -> None:142        self._state._validate(blockchain.value, self._intent)143        self.assertEqual(self._intent.blockchain, blockchain)144    def test_validate_error(self) -> None:145        self._state._validate("invalid", self._intent)146        self.assertIsNone(self._intent.blockchain)147@ddt.ddt148class TestWithUntilUseState(unittest.TestCase):149    def setUp(self) -> None:150        self._state = WithUntilAsState()151        self._intent = Intent()152    @ddt.data(("with", ModifierState), ("until", The2State), ("as", DefaultState))153    @ddt.unpack154    def test_run_success(self, token: str, successor_state: type) -> None:155        self.assertIsInstance(self._state.run(token, self._intent), successor_state)156    @ddt.data(("invalid", IllegalTransitionError))157    @ddt.unpack158    def test_run_error(self, token: str, exception: Type[Exception]) -> None:159        self.assertRaises(exception, self._state.run, token, self._intent)160@ddt.ddt161class TestProfileState(unittest.TestCase):162    def setUp(self) -> None:163        self._state = ProfileState()164        self._intent = Intent()165    @ddt.data(*((profile.value, FilterBlockchainState) for profile in Profile))166    @ddt.unpack167    def test_run_success(self, token: str, successor_state: type) -> None:168        self.assertIsInstance(self._state.run(token, self._intent), successor_state)169    @ddt.data(("invalid", IllegalTransitionError))170    @ddt.unpack171    def test_run_error(self, token: str, exception: Type[Exception]) -> None:172        self.assertRaises(exception, self._state.run, token, self._intent)173    @ddt.data(*Profile)174    def test_validate(self, profile: Profile) -> None:175        self._state._validate(profile.value, self._intent)176        self.assertEqual(self._intent.profile, profile)177@ddt.ddt178class TestFilterStateValidator(unittest.TestCase):179    def setUp(self) -> None:180        self._validator = FilterStateValidator()181        self._intent = Intent()182    @ddt.data(*Filter)183    def test_validate_success(self, filter_: Filter) -> None:184        self._validator.validate(filter_.value, self._intent)185        self.assertIn(filter_, self._intent.filters)186    def test_validate_error(self) -> None:187        self._validator.validate("invalid", self._intent)188        self.assertSetEqual(self._intent.filters, set())189    def test_validate_private_public(self) -> None:190        self._intent.filters.add(Filter.PRIVATE)191        self.assertRaises(192            ValidationError,193            self._validator.validate,194            Filter.PUBLIC.value,195            self._intent,196        )197    @ddt.data((Profile.CHEAPEST, Filter.CHEAP), (Profile.FASTEST, Filter.FAST))198    @ddt.unpack199    def test_validate_profile_filter(self, profile: Profile, filter_: Filter) -> None:200        self._intent.profile = profile201        self._validator.validate(filter_.value, self._intent)202        self.assertNotIn(filter_, self._intent.filters)203@ddt.ddt204class TestFilterBlockchainState(unittest.TestCase):205    def setUp(self) -> None:206        self._state = FilterBlockchainState()207        self._intent = Intent()208    @ddt.data(209        *((filter_.value, BlockchainState) for filter_ in Filter),210        ("blockchain", FromExceptWithUntilAsState),211    )212    @ddt.unpack213    def test_run_success(self, token: str, successor_state: type) -> None:214        self.assertIsInstance(self._state.run(token, self._intent), successor_state)215    @ddt.data(("invalid", IllegalTransitionError))216    @ddt.unpack217    def test_run_error(self, token: str, exception: Type[Exception]) -> None:218        self.assertRaises(exception, self._state.run, token, self._intent)219@ddt.ddt220class TestFilterState(unittest.TestCase):221    def setUp(self) -> None:222        self._state = FilterState()223        self._intent = Intent()224    @ddt.data(*((filter_.value, BlockchainState) for filter_ in Filter))225    @ddt.unpack226    def test_run_success(self, token: str, successor_state: type) -> None:227        self.assertIsInstance(self._state.run(token, self._intent), successor_state)228    @ddt.data(("invalid", IllegalTransitionError))229    @ddt.unpack230    def test_run_error(self, token: str, exception: Type[Exception]) -> None:231        self.assertRaises(exception, self._state.run, token, self._intent)232@ddt.ddt233class TestBlockchainState(unittest.TestCase):234    def setUp(self) -> None:235        self._state = BlockchainState(FilterState())236        self._intent = Intent()237    @ddt.data(238        *((token, FilterState) for token in {"and", ","}),239        ("blockchain", FromExceptWithUntilAsState),240    )241    @ddt.unpack242    def test_run_success(self, token: str, successor_state: type) -> None:243        self.assertIsInstance(self._state.run(token, self._intent), successor_state)244    @ddt.data(("invalid", IllegalTransitionError))245    @ddt.unpack246    def test_run_error(self, token: str, exception: Type[Exception]) -> None:247        self.assertRaises(exception, self._state.run, token, self._intent)248@ddt.ddt249class TestFromExceptWithUntilUseState(unittest.TestCase):250    def setUp(self) -> None:251        self._state = FromExceptWithUntilAsState()252        self._intent = Intent()253    @ddt.data(254        ("from", WhitelistState),255        ("except", BlacklistState),256        ("with", ModifierState),257        ("until", The2State),258        ("as", DefaultState),259    )260    @ddt.unpack261    def test_run_success(self, token: str, successor_state: type) -> None:262        self.assertIsInstance(self._state.run(token, self._intent), successor_state)263    @ddt.data(("invalid", IllegalTransitionError))264    @ddt.unpack265    def test_run_error(self, token: str, exception: Type[Exception]) -> None:266        self.assertRaises(exception, self._state.run, token, self._intent)267@ddt.ddt268class TestWhitelistState(unittest.TestCase):269    def setUp(self) -> None:270        self._state = WhitelistState()271        self._intent = Intent()272    @ddt.data(*((blockchain.value, WhitelistWithUntilAsState) for blockchain in Blockchain))273    @ddt.unpack274    def test_run_success(self, token: str, successor_state: type) -> None:275        self.assertIsInstance(self._state.run(token, self._intent), successor_state)276    @ddt.data(("invalid", IllegalTransitionError))277    @ddt.unpack278    def test_run_error(self, token: str, exception: Type[Exception]) -> None:279        self.assertRaises(exception, self._state.run, token, self._intent)280    @ddt.data(*Blockchain)281    def test_validate(self, blockchain: Blockchain) -> None:282        self._state._validate(blockchain.value, self._intent)283        self.assertIn(blockchain, self._intent.whitelist)284@ddt.ddt285class TestWhitelistWithUntilUseState(unittest.TestCase):286    def setUp(self) -> None:287        self._state = WhitelistWithUntilAsState(WhitelistState())288        self._intent = Intent()289    @ddt.data(290        *((token, WhitelistState) for token in {"and", ","}),291        ("with", ModifierState),292        ("until", The2State),293        ("as", DefaultState),294    )295    @ddt.unpack296    def test_run_success(self, token: str, successor_state: type) -> None:297        self.assertIsInstance(self._state.run(token, self._intent), successor_state)298    @ddt.data(("invalid", IllegalTransitionError))299    @ddt.unpack300    def test_run_error(self, token: str, exception: Type[Exception]) -> None:301        self.assertRaises(exception, self._state.run, token, self._intent)302@ddt.ddt303class TestBlacklistState(unittest.TestCase):304    def setUp(self) -> None:305        self._state = BlacklistState()306        self._intent = Intent()307    @ddt.data(*((blockchain.value, BlacklistWithUntilAsState) for blockchain in Blockchain))308    @ddt.unpack309    def test_run_success(self, token: str, successor_state: type) -> None:310        self.assertIsInstance(self._state.run(token, self._intent), successor_state)311    @ddt.data(("invalid", IllegalTransitionError))312    @ddt.unpack313    def test_run_error(self, token: str, exception: Type[Exception]) -> None:314        self.assertRaises(exception, self._state.run, token, self._intent)315    @ddt.data(*Blockchain)316    def test_validate(self, blockchain: Blockchain) -> None:317        self._state._validate(blockchain.value, self._intent)318        self.assertIn(blockchain, self._intent.blacklist)319@ddt.ddt320class TestBlacklistWithUntilAsState(unittest.TestCase):321    def setUp(self) -> None:322        self._state = BlacklistWithUntilAsState(BlacklistState())323        self._intent = Intent()324    @ddt.data(325        *((token, BlacklistState) for token in {"and", ","}),326        ("with", ModifierState),327        ("until", The2State),328        ("as", DefaultState),329    )330    @ddt.unpack331    def test_run_success(self, token: str, successor_state: type) -> None:332        self.assertIsInstance(self._state.run(token, self._intent), successor_state)333    @ddt.data(("invalid", IllegalTransitionError))334    @ddt.unpack335    def test_run_error(self, token: str, exception: Type[Exception]) -> None:336        self.assertRaises(exception, self._state.run, token, self._intent)337@ddt.ddt338class TestModifierState(unittest.TestCase):339    def setUp(self) -> None:340        self._state = ModifierState()341        self._intent = Intent()342    @ddt.data(*((modifier.value, UntilAsState) for modifier in Modifier))343    @ddt.unpack344    def test_run_success(self, token: str, successor_state: type) -> None:345        self.assertIsInstance(self._state.run(token, self._intent), successor_state)346    @ddt.data(("invalid", IllegalTransitionError))347    @ddt.unpack348    def test_run_error(self, token: str, exception: Type[Exception]) -> None:349        self.assertRaises(exception, self._state.run, token, self._intent)350    @ddt.data(*Modifier)351    def test_validate(self, modifier: Modifier) -> None:352        self._state._validate(modifier.value, self._intent)353        self.assertIn(modifier, self._intent.modifiers)354@ddt.ddt355class TestUntilAsState(unittest.TestCase):356    def setUp(self) -> None:357        self._state = UntilAsState(ModifierState())358        self._intent = Intent()359    @ddt.data(360        *((token, ModifierState) for token in {"and", ","}),361        ("until", The2State),362        ("as", DefaultState),363    )364    @ddt.unpack365    def test_run_success(self, token: str, successor_state: type) -> None:366        self.assertIsInstance(self._state.run(token, self._intent), successor_state)367    @ddt.data(("invalid", IllegalTransitionError))368    @ddt.unpack369    def test_run_error(self, token: str, exception: Type[Exception]) -> None:370        self.assertRaises(exception, self._state.run, token, self._intent)371@ddt.ddt372class TestThe2State(unittest.TestCase):373    def setUp(self) -> None:374        self._state = The2State()375        self._intent = Intent()376    @ddt.data(("the", IntervalState))377    @ddt.unpack378    def test_run_success(self, token: str, successor_state: type) -> None:379        self.assertIsInstance(self._state.run(token, self._intent), successor_state)380    @ddt.data(("invalid", IllegalTransitionError))381    @ddt.unpack382    def test_run_error(self, token: str, exception: Type[Exception]) -> None:383        self.assertRaises(exception, self._state.run, token, self._intent)384@ddt.ddt385class TestIntervalState(unittest.TestCase):386    def setUp(self) -> None:387        self._state = IntervalState()388        self._intent = Intent()389    @ddt.data(*((interval.value, CostsState) for interval in Interval))390    @ddt.unpack391    def test_run_success(self, token: str, successor_state: type) -> None:392        self.assertIsInstance(self._state.run(token, self._intent), successor_state)393    @ddt.data(("invalid", IllegalTransitionError))394    @ddt.unpack395    def test_run_error(self, token: str, exception: Type[Exception]) -> None:396        self.assertRaises(exception, self._state.run, token, self._intent)397    @ddt.data(*Interval)398    def test_validate(self, interval: Interval) -> None:399        self._state._validate(interval.value, self._intent)400        self.assertEqual(self._intent.interval, interval)401@ddt.ddt402class TestCostsState(unittest.TestCase):403    def setUp(self) -> None:404        self._state = CostsState()405        self._intent = Intent()406    @ddt.data(("costs", ReachState))407    @ddt.unpack408    def test_run_success(self, token: str, successor_state: type) -> None:409        self.assertIsInstance(self._state.run(token, self._intent), successor_state)410    @ddt.data(("invalid", IllegalTransitionError))411    @ddt.unpack412    def test_run_error(self, token: str, exception: Type[Exception]) -> None:413        self.assertRaises(exception, self._state.run, token, self._intent)414@ddt.ddt415class TestReachState(unittest.TestCase):416    def setUp(self) -> None:417        self._state = ReachState()418        self._intent = Intent()419    @ddt.data(("reach", CurrencyThresholdState))420    @ddt.unpack421    def test_run_success(self, token: str, successor_state: type) -> None:422        self.assertIsInstance(self._state.run(token, self._intent), successor_state)423    @ddt.data(("invalid", IllegalTransitionError))424    @ddt.unpack425    def test_run_error(self, token: str, exception: Type[Exception]) -> None:426        self.assertRaises(exception, self._state.run, token, self._intent)427@ddt.ddt428class TestCurrencyThresholdState(unittest.TestCase):429    def setUp(self) -> None:430        self._state = CurrencyThresholdState()431        self._intent = Intent()432    @ddt.data(433        *((currency.value, ThresholdState) for currency in Currency),434        ("47.2", PolicyState),435    )436    @ddt.unpack437    def test_run_success(self, token: str, successor_state: type) -> None:438        self.assertIsInstance(self._state.run(token, self._intent), successor_state)439    @ddt.data(("invalid", ValidationError))440    @ddt.unpack441    def test_run_error(self, token: str, exception: Type[Exception]) -> None:442        self.assertRaises(exception, self._state.run, token, self._intent)443    @ddt.data(*Currency)444    def test_validate(self, currency: Currency) -> None:445        self._state._validate(currency.value, self._intent)446        self.assertEqual(self._intent.currency, currency)447@ddt.ddt448class TestThresholdState(unittest.TestCase):449    def setUp(self) -> None:450        self._state = ThresholdState()451        self._intent = Intent()452    @ddt.data(("35.7", PolicyState))453    @ddt.unpack454    def test_run(self, token: str, successor_state: type) -> None:455        self.assertIsInstance(self._state.run(token, self._intent), successor_state)456    @ddt.data(("invalid", ValidationError))457    @ddt.unpack458    def test_run_error(self, token: str, exception: Type[Exception]) -> None:459        self.assertRaises(exception, self._state.run, token, self._intent)460    @ddt.data(15.7)461    def test_validate(self, threshold: float) -> None:462        self._state._validate(str(threshold), self._intent)463        self.assertEqual(self._intent.threshold, threshold)464    def test_validate_error(self) -> None:465        self.assertRaises(ValidationError, self._state._validate, "invalid", self._intent)466@ddt.ddt467class TestPolicyState(unittest.TestCase):468    def setUp(self) -> None:469        self._state = PolicyState()470        self._intent = Intent()471    @ddt.data("random")472    def test_run(self, token: str) -> None:473        self.assertRaises(IllegalTransitionError, self._state.run, token, self._intent)474@ddt.ddt475class TestDefaultState(unittest.TestCase):476    def setUp(self) -> None:477        self._state = DefaultState()478        self._intent = Intent()479    @ddt.data(("default", DefaultPolicyState))480    @ddt.unpack481    def test_run_success(self, token: str, successor_state: type) -> None:482        self.assertIsInstance(self._state.run(token, self._intent), successor_state)483    @ddt.data(("invalid", IllegalTransitionError))484    @ddt.unpack485    def test_run_error(self, token: str, exception: Type[Exception]) -> None:486        self.assertRaises(exception, self._state.run, token, self._intent)487    @ddt.data(*Timeframe)488    def test_validate(self, timeframe: Timeframe) -> None:489        self._intent.timeframe = timeframe490        self._state._validate("default", self._intent)491        self.assertIsNone(self._intent.timeframe)492        self.assertIsNone(self._intent.interval)493@ddt.ddt494class TestDefaultPolicyState(unittest.TestCase):495    def setUp(self) -> None:496        self._state = DefaultPolicyState()497        self._intent = Intent()498    @ddt.data("random")499    def test_run(self, token: str) -> None:...test_cli.py
Source:test_cli.py  
1import pytest2from sweeper.sync import run3def test_cli(config_file, db):4    """Dummy test config and backend to check that loading works"""5    run("test", config=config_file)6    assert db["metadata"].count(job="test") == 17    job = db["metadata"].find_one(job="test")8    assert job["error"] is None9    _run = db["test"].find_one()10    assert _run["metadata_id"] == job["id"]11def test_cli_error(config_file, db):12    with pytest.raises(Exception):13        run("test_error", config=config_file)14    assert db["metadata"].count(job="test_error") == 115    assert db["metadata"].find_one(job="test_error")["error"] == "ERROR"16def test_cli_run_error(config_file, db):17    run("test_run_error", config=config_file)18    assert db["metadata"].count(job="test_run_error") == 119    job = db["metadata"].find_one(job="test_run_error")20    assert job["has_run_errors"]21    assert db["test_run_error"].count() == 122    _run = db["test_run_error"].find_one()23    assert _run["error"] == "ERROR"24    assert _run["name"] == "dumdum"...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
