Best Python code snippet using Testify_python
testlib.py
Source:testlib.py  
1import collections2import os.path3import random4import re5import shlex6import subprocess7import sys8import tempfile9import time10import traceback11import pexpect12# Note that gdb comes with its own testsuite. I was unable to figure out how to13# run that testsuite against the spike simulator.14def find_file(path):15    for directory in (os.getcwd(), os.path.dirname(__file__)):16        fullpath = os.path.join(directory, path)17        if os.path.exists(fullpath):18            return fullpath19    return None20def compile(args, xlen=32): # pylint: disable=redefined-builtin21    cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")22    cmd = [cc, "-g"]23    if xlen == 32:24        cmd.append("-march=rv32imac")25        cmd.append("-mabi=ilp32")26    else:27        cmd.append("-march=rv64imac")28        cmd.append("-mabi=lp64")29    for arg in args:30        found = find_file(arg)31        if found:32            cmd.append(found)33        else:34            cmd.append(arg)35    process = subprocess.Popen(cmd, stdout=subprocess.PIPE,36                               stderr=subprocess.PIPE)37    stdout, stderr = process.communicate()38    if process.returncode:39        print40        header("Compile failed")41        print "+", " ".join(cmd)42        print stdout,43        print stderr,44        header("")45        raise Exception("Compile failed!")46def unused_port():47    # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#283830948    import socket49    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)50    s.bind(("", 0))51    port = s.getsockname()[1]52    s.close()53    return port54class Spike(object):55    logname = "spike-%d.log" % os.getpid()56    def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):57        """Launch spike. Return tuple of its process and the port it's running58        on."""59        if target.sim_cmd:60            cmd = shlex.split(target.sim_cmd)61        else:62            spike = os.path.expandvars("$RISCV/bin/spike")63            cmd = [spike]64        if target.xlen == 32:65            cmd += ["--isa", "RV32G"]66        else:67            cmd += ["--isa", "RV64G"]68        cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]69        if timeout:70            cmd = ["timeout", str(timeout)] + cmd71        if halted:72            cmd.append('-H')73        if with_jtag_gdb:74            cmd += ['--rbb-port', '0']75            os.environ['REMOTE_BITBANG_HOST'] = 'localhost'76        self.infinite_loop = target.compile(77                "programs/checksum.c", "programs/tiny-malloc.c",78                "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")79        cmd.append(self.infinite_loop)80        logfile = open(self.logname, "w")81        logfile.write("+ %s\n" % " ".join(cmd))82        logfile.flush()83        self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,84                stdout=logfile, stderr=logfile)85        if with_jtag_gdb:86            self.port = None87            for _ in range(30):88                m = re.search(r"Listening for remote bitbang connection on "89                        r"port (\d+).", open(self.logname).read())90                if m:91                    self.port = int(m.group(1))92                    os.environ['REMOTE_BITBANG_PORT'] = m.group(1)93                    break94                time.sleep(0.11)95            assert self.port, "Didn't get spike message about bitbang " \96                    "connection"97    def __del__(self):98        try:99            self.process.kill()100            self.process.wait()101        except OSError:102            pass103    def wait(self, *args, **kwargs):104        return self.process.wait(*args, **kwargs)105class VcsSim(object):106    def __init__(self, sim_cmd=None, debug=False):107        if sim_cmd:108            cmd = shlex.split(sim_cmd)109        else:110            cmd = ["simv"]111        cmd += ["+jtag_vpi_enable"]112        if debug:113            cmd[0] = cmd[0] + "-debug"114            cmd += ["+vcdplusfile=output/gdbserver.vpd"]115        logfile = open("simv.log", "w")116        logfile.write("+ %s\n" % " ".join(cmd))117        logfile.flush()118        listenfile = open("simv.log", "r")119        listenfile.seek(0, 2)120        self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,121                stdout=logfile, stderr=logfile)122        done = False123        while not done:124            # Fail if VCS exits early125            exit_code = self.process.poll()126            if exit_code is not None:127                raise RuntimeError('VCS simulator exited early with status %d'128                                   % exit_code)129            line = listenfile.readline()130            if not line:131                time.sleep(1)132            match = re.match(r"^Listening on port (\d+)$", line)133            if match:134                done = True135                self.port = int(match.group(1))136                os.environ['JTAG_VPI_PORT'] = str(self.port)137    def __del__(self):138        try:139            self.process.kill()140            self.process.wait()141        except OSError:142            pass143class Openocd(object):144    logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')145    logname = logfile.name146    def __init__(self, server_cmd=None, config=None, debug=False):147        if server_cmd:148            cmd = shlex.split(server_cmd)149        else:150            openocd = os.path.expandvars("$RISCV/bin/openocd")151            cmd = [openocd]152            if debug:153                cmd.append("-d")154        # This command needs to come before any config scripts on the command155        # line, since they are executed in order.156        cmd += [157            # Tell OpenOCD to bind gdb to an unused, ephemeral port.158            "--command",159            "gdb_port 0",160            # Disable tcl and telnet servers, since they are unused and because161            # the port numbers will conflict if multiple OpenOCD processes are162            # running on the same server.163            "--command",164            "tcl_port disabled",165            "--command",166            "telnet_port disabled",167        ]168        if config:169            f = find_file(config)170            if f is None:171                print "Unable to read file " + config172                exit(1)173            cmd += ["-f", f]174        if debug:175            cmd.append("-d")176        logfile = open(Openocd.logname, "w")177        logfile.write("+ %s\n" % " ".join(cmd))178        logfile.flush()179        self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,180                stdout=logfile, stderr=logfile)181        # Wait for OpenOCD to have made it through riscv_examine(). When using182        # OpenOCD to communicate with a simulator this may take a long time,183        # and gdb will time out when trying to connect if we attempt too early.184        start = time.time()185        messaged = False186        while True:187            log = open(Openocd.logname).read()188            if "Ready for Remote Connections" in log:189                break190            if not self.process.poll() is None:191                header("OpenOCD log")192                sys.stdout.write(log)193                raise Exception(194                        "OpenOCD exited before completing riscv_examine()")195            if not messaged and time.time() - start > 1:196                messaged = True197                print "Waiting for OpenOCD to examine RISCV core..."198            if time.time() - start > 60:199                raise Exception("ERROR: Timed out waiting for OpenOCD to "200                        "examine RISCV core")201        try:202            self.port = self._get_gdb_server_port()203        except:204            header("OpenOCD log")205            sys.stdout.write(log)206            raise207    def _get_gdb_server_port(self):208        """Get port that OpenOCD's gdb server is listening on."""209        MAX_ATTEMPTS = 50210        PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')211        for _ in range(MAX_ATTEMPTS):212            with open(os.devnull, 'w') as devnull:213                try:214                    output = subprocess.check_output([215                        'lsof',216                        '-a',  # Take the AND of the following selectors217                        '-p{}'.format(self.process.pid),  # Filter on PID218                        '-iTCP',  # Filter only TCP sockets219                    ], stderr=devnull)220                except subprocess.CalledProcessError:221                    output = ""222            matches = list(PORT_REGEX.finditer(output))223            matches = [m for m in matches224                    if m.group('port') not in ('6666', '4444')]225            if len(matches) > 1:226                print output227                raise Exception(228                    "OpenOCD listening on multiple ports. Cannot uniquely "229                    "identify gdb server port.")230            elif matches:231                [match] = matches232                return int(match.group('port'))233            time.sleep(1)234        raise Exception("Timed out waiting for gdb server to obtain port.")235    def __del__(self):236        try:237            self.process.kill()238            self.process.wait()239        except (OSError, AttributeError):240            pass241class OpenocdCli(object):242    def __init__(self, port=4444):243        self.child = pexpect.spawn(244                "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)245        self.child.expect("> ")246    def command(self, cmd):247        self.child.sendline(cmd)248        self.child.expect(cmd)249        self.child.expect("\n")250        self.child.expect("> ")251        return self.child.before.strip("\t\r\n \0")252    def reg(self, reg=''):253        output = self.command("reg %s" % reg)254        matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)255        values = {r: int(v, 0) for r, v in matches}256        if reg:257            return values[reg]258        return values259    def load_image(self, image):260        output = self.command("load_image %s" % image)261        if 'invalid ELF file, only 32bits files are supported' in output:262            raise TestNotApplicable(output)263class CannotAccess(Exception):264    def __init__(self, address):265        Exception.__init__(self)266        self.address = address267Thread = collections.namedtuple('Thread', ('id', 'target_id', 'name',268    'frame'))269class Gdb(object):270    logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")271    logname = logfile.name272    def __init__(self,273            cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):274        self.child = pexpect.spawn(cmd)275        self.child.logfile = open(self.logname, "w")276        self.child.logfile.write("+ %s\n" % cmd)277        self.wait()278        self.command("set confirm off")279        self.command("set width 0")280        self.command("set height 0")281        # Force consistency.282        self.command("set print entry-values no")283    def wait(self):284        """Wait for prompt."""285        self.child.expect(r"\(gdb\)")286    def command(self, command, timeout=6000):287        self.child.sendline(command)288        self.child.expect("\n", timeout=timeout)289        self.child.expect(r"\(gdb\)", timeout=timeout)290        return self.child.before.strip()291    def c(self, wait=True, timeout=-1, async=False):292        if async:293            async = "&"294        else:295            async = ""296        if wait:297            output = self.command("c%s" % async, timeout=timeout)298            assert "Continuing" in output299            return output300        else:301            self.child.sendline("c%s" % async)302            self.child.expect("Continuing")303    def interrupt(self):304        self.child.send("\003")305        self.child.expect(r"\(gdb\)", timeout=6000)306        return self.child.before.strip()307    def x(self, address, size='w'):308        output = self.command("x/%s %s" % (size, address))309        value = int(output.split(':')[1].strip(), 0)310        return value311    def p_raw(self, obj):312        output = self.command("p %s" % obj)313        m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)314        if m:315            raise CannotAccess(int(m.group(1), 0))316        return output.split('=')[-1].strip()317    def p(self, obj):318        output = self.command("p/x %s" % obj)319        m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)320        if m:321            raise CannotAccess(int(m.group(1), 0))322        value = int(output.split('=')[-1].strip(), 0)323        return value324    def p_string(self, obj):325        output = self.command("p %s" % obj)326        value = shlex.split(output.split('=')[-1].strip())[1]327        return value328    def stepi(self):329        output = self.command("stepi")330        return output331    def load(self):332        output = self.command("load", timeout=6000)333        assert "failed" not in  output334        assert "Transfer rate" in output335    def b(self, location):336        output = self.command("b %s" % location)337        assert "not defined" not in output338        assert "Breakpoint" in output339        return output340    def hbreak(self, location):341        output = self.command("hbreak %s" % location)342        assert "not defined" not in output343        assert "Hardware assisted breakpoint" in output344        return output345    def threads(self):346        output = self.command("info threads")347        threads = []348        for line in output.splitlines():349            m = re.match(350                    r"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",351                    line)352            if m:353                threads.append(Thread(*m.groups()))354        if not threads:355            threads.append(Thread('1', '1', 'Default', '???'))356        return threads357    def thread(self, thread):358        return self.command("thread %s" % thread.id)359def run_all_tests(module, target, parsed):360    if not os.path.exists(parsed.logs):361        os.makedirs(parsed.logs)362    overall_start = time.time()363    global gdb_cmd  # pylint: disable=global-statement364    gdb_cmd = parsed.gdb365    todo = []366    if parsed.misaval:367        target.misa = int(parsed.misaval, 16)368        print "Using $misa from command line: 0x%x" % target.misa369    elif target.misa:370        print "Using $misa from target definition: 0x%x" % target.misa371    else:372        todo.append(("ExamineTarget", ExamineTarget))373    for name in dir(module):374        definition = getattr(module, name)375        if type(definition) == type and hasattr(definition, 'test') and \376                (not parsed.test or any(test in name for test in parsed.test)):377            todo.append((name, definition))378    results, count = run_tests(parsed, target, todo)379    header("ran %d tests in %.0fs" % (count, time.time() - overall_start),380            dash=':')381    return print_results(results)382good_results = set(('pass', 'not_applicable'))383def run_tests(parsed, target, todo):384    results = {}385    count = 0386    for name, definition in todo:387        instance = definition(target)388        log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %389                (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))390        log_fd = open(log_name, 'w')391        print "Running", name, "...",392        sys.stdout.flush()393        log_fd.write("Test: %s\n" % name)394        log_fd.write("Target: %s\n" % type(target).__name__)395        start = time.time()396        real_stdout = sys.stdout397        sys.stdout = log_fd398        try:399            result = instance.run()400            log_fd.write("Result: %s\n" % result)401        finally:402            sys.stdout = real_stdout403            log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))404        print "%s in %.2fs" % (result, time.time() - start)405        sys.stdout.flush()406        results.setdefault(result, []).append(name)407        count += 1408        if result not in good_results and parsed.fail_fast:409            break410    return results, count411def print_results(results):412    result = 0413    for key, value in results.iteritems():414        print "%d tests returned %s" % (len(value), key)415        if key not in good_results:416            result = 1417            for test in value:418                print "   ", test419    return result420def add_test_run_options(parser):421    parser.add_argument("--logs", default="logs",422            help="Store logs in the specified directory.")423    parser.add_argument("--fail-fast", "-f", action="store_true",424            help="Exit as soon as any test fails.")425    parser.add_argument("test", nargs='*',426            help="Run only tests that are named here.")427    parser.add_argument("--gdb",428            help="The command to use to start gdb.")429    parser.add_argument("--misaval",430            help="Don't run ExamineTarget, just assume the misa value which is "431            "specified.")432def header(title, dash='-', length=78):433    if title:434        dashes = dash * (length - 4 - len(title))435        before = dashes[:len(dashes)/2]436        after = dashes[len(dashes)/2:]437        print "%s[ %s ]%s" % (before, title, after)438    else:439        print dash * length440def print_log(path):441    header(path)442    lines = open(path, "r").readlines()443    for l in lines:444        sys.stdout.write(l)445    print446class BaseTest(object):447    compiled = {}448    def __init__(self, target):449        self.target = target450        self.server = None451        self.target_process = None452        self.binary = None453        self.start = 0454        self.logs = []455    def early_applicable(self):456        """Return a false value if the test has determined it cannot run457        without ever needing to talk to the target or server."""458        # pylint: disable=no-self-use459        return True460    def setup(self):461        pass462    def compile(self):463        compile_args = getattr(self, 'compile_args', None)464        if compile_args:465            if compile_args not in BaseTest.compiled:466                # pylint: disable=star-args467                BaseTest.compiled[compile_args] = \468                        self.target.compile(*compile_args)469        self.binary = BaseTest.compiled.get(compile_args)470    def classSetup(self):471        self.compile()472        self.target_process = self.target.create()473        if self.target_process:474            self.logs.append(self.target_process.logname)475        try:476            self.server = self.target.server()477            self.logs.append(self.server.logname)478        except Exception:479            for log in self.logs:480                print_log(log)481            raise482    def classTeardown(self):483        del self.server484        del self.target_process485    def run(self):486        """487        If compile_args is set, compile a program and set self.binary.488        Call setup().489        Then call test() and return the result, displaying relevant information490        if an exception is raised.491        """492        sys.stdout.flush()493        if not self.early_applicable():494            return "not_applicable"495        self.start = time.time()496        try:497            self.classSetup()498            self.setup()499            result = self.test()    # pylint: disable=no-member500        except TestNotApplicable:501            result = "not_applicable"502        except Exception as e: # pylint: disable=broad-except503            if isinstance(e, TestFailed):504                result = "fail"505            else:506                result = "exception"507            if isinstance(e, TestFailed):508                header("Message")509                print e.message510            header("Traceback")511            traceback.print_exc(file=sys.stdout)512            return result513        finally:514            for log in self.logs:515                print_log(log)516            header("End of logs")517            self.classTeardown()518        if not result:519            result = 'pass'520        return result521gdb_cmd = None522class GdbTest(BaseTest):523    def __init__(self, target):524        BaseTest.__init__(self, target)525        self.gdb = None526    def classSetup(self):527        BaseTest.classSetup(self)528        if gdb_cmd:529            self.gdb = Gdb(gdb_cmd)530        else:531            self.gdb = Gdb()532        self.logs.append(self.gdb.logname)533        if self.binary:534            self.gdb.command("file %s" % self.binary)535        if self.target:536            self.gdb.command("set arch riscv:rv%d" % self.target.xlen)537            self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)538        if self.server.port:539            self.gdb.command(540                    "target extended-remote localhost:%d" % self.server.port)541            # Select a random thread.542            # TODO: Allow a command line option to force a specific thread.543            thread = random.choice(self.gdb.threads())544            self.gdb.thread(thread)545        for cmd in self.target.gdb_setup:546            self.gdb.command(cmd)547        # FIXME: OpenOCD doesn't handle PRIV now548        #self.gdb.p("$priv=3")549    def classTeardown(self):550        del self.gdb551        BaseTest.classTeardown(self)552class ExamineTarget(GdbTest):553    def test(self):554        self.target.misa = self.gdb.p("$misa")555        txt = "RV"556        if (self.target.misa >> 30) == 1:557            txt += "32"558        elif (self.target.misa >> 62) == 2:559            txt += "64"560        elif (self.target.misa >> 126) == 3:561            txt += "128"562        else:563            raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %564                    self.target.misa)565        for i in range(26):566            if self.target.misa & (1<<i):567                txt += chr(i + ord('A'))568        print txt,569class TestFailed(Exception):570    def __init__(self, message):571        Exception.__init__(self)572        self.message = message573class TestNotApplicable(Exception):574    def __init__(self, message):575        Exception.__init__(self)576        self.message = message577def assertEqual(a, b):578    if a != b:579        raise TestFailed("%r != %r" % (a, b))580def assertNotEqual(a, b):581    if a == b:582        raise TestFailed("%r == %r" % (a, b))583def assertIn(a, b):584    if a not in b:585        raise TestFailed("%r not in %r" % (a, b))586def assertNotIn(a, b):587    if a in b:588        raise TestFailed("%r in %r" % (a, b))589def assertGreater(a, b):590    if not a > b:591        raise TestFailed("%r not greater than %r" % (a, b))592def assertLess(a, b):593    if not a < b:594        raise TestFailed("%r not less than %r" % (a, b))595def assertTrue(a):596    if not a:597        raise TestFailed("%r is not True" % a)598def assertRegexpMatches(text, regexp):599    if not re.search(regexp, text):...suite.py
Source:suite.py  
1"""2Test Suites3"""4from __future__ import generators5import sys6import unittest7from nose_helper.case import Test8from nose_helper.config import Config9from nose_helper.util import isclass, resolve_name, try_run10PYTHON_VERSION_MAJOR = sys.version_info[0]11class LazySuite(unittest.TestSuite):12    """A suite that may use a generator as its list of tests13    """14    def __init__(self, tests=()):15        self._set_tests(tests)16                17    def __iter__(self):18        return iter(self._tests)19    def __hash__(self):20        return object.__hash__(self)21    def addTest(self, test):22        self._precache.append(test)23    def __nonzero__(self):24        if self._precache:25            return True26        if self.test_generator is None:27            return False28        try:29            test = self.test_generator.next()30            if test is not None:31                self._precache.append(test)32                return True33        except StopIteration:34            pass35        return False36    def _get_tests(self):37        if self.test_generator is not None:38            for i in self.test_generator:39                yield i40        for test in self._precache:41            yield test42    def _set_tests(self, tests):43        self._precache = []44        is_suite = isinstance(tests, unittest.TestSuite)45        if hasattr(tests, '__call__') and not is_suite:46            self.test_generator = tests()47            self.test_generator_counter = list(tests())48        elif is_suite:49            self.addTests([tests])50            self.test_generator = None51            self.test_generator_counter = None52        else:53            self.addTests(tests)54            self.test_generator = None55            self.test_generator_counter = None56    def countTestCases(self):57        counter = 058        generator = self.test_generator_counter59        if generator is not None:60            for test in generator:61                counter +=162        for test in self._precache:63            counter += test.countTestCases()64        return counter65    _tests = property(_get_tests, _set_tests, None,66                      "Access the tests in this suite.")67class ContextSuite(LazySuite):68    """A suite with context.69    """70    was_setup = False71    was_torndown = False72    classSetup = ('setup_class', 'setup_all', 'setupClass', 'setupAll',73                     'setUpClass', 'setUpAll')74    classTeardown = ('teardown_class', 'teardown_all', 'teardownClass',75                     'teardownAll', 'tearDownClass', 'tearDownAll')76    moduleSetup = ('setup_module', 'setupModule', 'setUpModule', 'setup',77                   'setUp')78    moduleTeardown = ('teardown_module', 'teardownModule', 'tearDownModule',79                      'teardown', 'tearDown')80    packageSetup = ('setup_package', 'setupPackage', 'setUpPackage')81    packageTeardown = ('teardown_package', 'teardownPackage',82                       'tearDownPackage')83    def __init__(self, tests=(), context=None, factory=None,84                 config=None):85        86        self.context = context87        self.factory = factory88        if config is None:89            config = Config()90        self.config = config91        self.has_run = False92        self.error_context = None93        LazySuite.__init__(self, tests)94    def __hash__(self):95        return object.__hash__(self)96    def __call__(self, *arg, **kw):97        return self.run(*arg, **kw)98    def _exc_info(self):99        return sys.exc_info()100    def addTests(self, tests, context=None):101        if context:102            self.context = context103        if PYTHON_VERSION_MAJOR < 3 and isinstance(tests, basestring):104          raise TypeError("tests must be an iterable of tests, not a string")105        else:106          if isinstance(tests, str):107            raise TypeError("tests must be an iterable of tests, not a string")108        for test in tests:109            self.addTest(test)110    def run(self, result):111        """Run tests in suite inside of suite fixtures.112        """113        result, orig = result, result114        try:115            self.setUp()116        except KeyboardInterrupt:117            raise118        except:119            self.error_context = 'setup'120            result.addError(self, self._exc_info())121            return122        try:123            for test in self._tests:124                if result.shouldStop:125                    break126                test(orig)127        finally:128            self.has_run = True129            try:130                self.tearDown()131            except KeyboardInterrupt:132                raise133            except:134                self.error_context = 'teardown'135                result.addError(self, self._exc_info())136    def setUp(self):137        if not self:138           return139        if self.was_setup:140            return141        context = self.context142        if context is None:143            return144        factory = self.factory145        if factory:146            ancestors = factory.context.get(self, [])[:]147            while ancestors:148                ancestor = ancestors.pop()149                if ancestor in factory.was_setup:150                    continue151                self.setupContext(ancestor)152            if not context in factory.was_setup:153                self.setupContext(context)154        else:155            self.setupContext(context)156        self.was_setup = True157    def setupContext(self, context):158        if self.factory:159            if context in self.factory.was_setup:160                return161            self.factory.was_setup[context] = self162        if isclass(context):163            names = self.classSetup164        else:165            names = self.moduleSetup166            if hasattr(context, '__path__'):167                names = self.packageSetup + names168        try_run(context, names)169    def tearDown(self):170        if not self.was_setup or self.was_torndown:171            return172        self.was_torndown = True173        context = self.context174        if context is None:175            return176        factory = self.factory177        if factory:178            ancestors = factory.context.get(self, []) + [context]179            for ancestor in ancestors:180                if not ancestor in factory.was_setup:181                    continue182                if ancestor in factory.was_torndown:183                    continue184                setup = factory.was_setup[ancestor]185                if setup is self:186                    self.teardownContext(ancestor)187        else:188            self.teardownContext(context)189        190    def teardownContext(self, context):191        if self.factory:192            if context in self.factory.was_torndown:193                return194            self.factory.was_torndown[context] = self195        if isclass(context):196            names = self.classTeardown197        else:198            names = self.moduleTeardown199            if hasattr(context, '__path__'):200                names = self.packageTeardown + names201        try_run(context, names)202    def _get_wrapped_tests(self):203        for test in self._get_tests():204            if isinstance(test, Test) or isinstance(test, unittest.TestSuite):205                yield test206            else:207                yield Test(test,208                           config=self.config)209    _tests = property(_get_wrapped_tests, LazySuite._set_tests, None,210                      "Access the tests in this suite. Tests are returned "211                      "inside of a context wrapper.")212class ContextSuiteFactory(object):213    suiteClass = ContextSuite214    def __init__(self, config=None):215        if config is None:216            config = Config()217        self.config = config218        self.suites = {}219        self.context = {}220        self.was_setup = {}221        self.was_torndown = {}222    def __call__(self, tests, **kw):223        """Return 'ContextSuite' for tests.224        """225        context = kw.pop('context', getattr(tests, 'context', None))226        if context is None:227            tests = self.wrapTests(tests)228            context = self.findContext(tests)229        return self.makeSuite(tests, context, **kw)230        231    def ancestry(self, context):232        """Return the ancestry of the context233        """234        if context is None:235            return236        if hasattr(context, 'im_class'):237            context = context.im_class238        if hasattr(context, '__module__'):239            ancestors = context.__module__.split('.')240        elif hasattr(context, '__name__'):241            ancestors = context.__name__.split('.')[:-1]242        else:243            raise TypeError("%s has no ancestors?" % context)244        while ancestors:245            yield resolve_name('.'.join(ancestors))246            ancestors.pop()247    def findContext(self, tests):248        if hasattr(tests, '__call__') or isinstance(tests, unittest.TestSuite):249            return None250        context = None251        for test in tests:252            # Don't look at suites for contexts, only tests253            ctx = getattr(test, 'context', None)254            if ctx is None:255                continue256            if context is None:257                context = ctx258        return context259    def makeSuite(self, tests, context, **kw):260        suite = self.suiteClass(261            tests, context=context, config=self.config, factory=self, **kw)262        if context is not None:263            self.suites.setdefault(context, []).append(suite)264            self.context.setdefault(suite, []).append(context)265            for ancestor in self.ancestry(context):266                self.suites.setdefault(ancestor, []).append(suite)267                self.context[suite].append(ancestor)268        return suite269    def wrapTests(self, tests):270        if hasattr(tests, '__call__') or isinstance(tests, unittest.TestSuite):271            return tests272        wrapped = []273        for test in tests:274            if isinstance(test, Test) or isinstance(test, unittest.TestSuite):275                wrapped.append(test)276            elif isinstance(test, ContextList):277                wrapped.append(self.makeSuite(test, context=test.context))278            else:279                wrapped.append(280                    Test(test, config=self.config)281                    )282        return wrapped283class ContextList(object):284    """a group of tests in a context.285    """286    def __init__(self, tests, context=None):287      self.tests = tests288      self.context = context289    def __iter__(self):...attr.py
Source:attr.py  
...28        self.tra['n'] = self.n29    def tearDown(self):30        print("tearDown")31        self.class_tda['tearDown'] = 'teardown'32    def classTearDown(self):33        print("class tearDown")34        self.class_tda['classTearDown'] = 'class teardown'35class not_shared(object):36    __test__ = True37    minipyt_shared = False38    n = 039    def __init__(self):40        print("init works")41    def classSetUp(self):42        print("classSetUp")43        self.class_tda['classSetUp'] = 'class setup'44        self.class_tra['classSetUp'] = 'class setup'45    def setUp(self):46        print("setUp")47        self.class_tda['setUp'] = 'setup'48        self.class_tra['setUp'] = 'setup'49    def test_1(self):50        print("test_1")51        self.tda['test'] = 'test_1'52        self.tra['test'] = 'test_1'53        self.n = self.n + 154        self.tra['n'] = self.n55    def test_2(self):56        print("test_2")57        self.tda['test'] = 'test_2'58        self.tra['test'] = 'test_2'59        self.n = self.n + 160        self.tra['n'] = self.n61    def tearDown(self):62        print("tearDown")63        self.class_tda['tearDown'] = 'teardown'64    def classTearDown(self):65        print("class tearDown")66        # class teardown cannot set attributes on class if the class is not67        # shared68class class_setup_exception(object):69    __test__ = True70    minipyt_shared = True71    def __init__(self):72        print("init class_setup_exception")73    def classSetUp(self):74        print("classSetUp")75        self.class_tda['classSetUp'] = 'class setup'76        raise Exception('ARF')77    def setUp(self):78        print("setUp")79        self.class_tda['setUp'] = 'setup'80    def test_1(self):81        print("test_1")82        self.tda['test'] = 'test_1'83        self.tra['test'] = 'test_1'84    def test_2(self):85        print("test_2")86        self.tda['test'] = 'test_2'87        self.tra['test'] = 'test_2'88    def tearDown(self):89        print("tearDown")90        self.class_tda['tearDown'] = 'teardown'91    def classTearDown(self):92        print("class tearDown")93        self.class_tda['classTearDown'] = 'class teardown'94class setup_exception(object):95    __test__ = True96    minipyt_shared = True97    def __init__(self):98        print("init setup_exception")99    def classSetUp(self):100        print("classSetUp")101        self.class_tda['classSetUp'] = 'class setup'102    def setUp(self):103        print("setUp")104        self.class_tda['setUp'] = 'setup'105        raise Exception('ARF')106    def test_1(self):107        print("test_1")108        self.tda['test'] = 'test_1'109        self.tra['test'] = 'test_1'110    def test_2(self):111        print("test_2")112        self.tda['test2'] = 'test_2'113        self.tra['test2'] = 'test_2'114    def tearDown(self):115        print("tearDown")116        self.class_tda['tearDown'] = 'teardown'117    def classTearDown(self):118        print("class tearDown")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
