Best Python code snippet using tappy_python
test_memory_leaks.py
Source:test_memory_leaks.py  
1#!/usr/bin/env python2# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5"""6Tests for detecting function memory leaks (typically the ones7implemented in C). It does so by calling a function many times and8checking whether process memory usage keeps increasing between9calls or over time.10Note that this may produce false positives (especially on Windows11for some reason).12"""13from __future__ import print_function14import errno15import functools16import gc17import os18import sys19import threading20import time21import psutil22import psutil._common23from psutil import LINUX24from psutil import OPENBSD25from psutil import OSX26from psutil import POSIX27from psutil import SUNOS28from psutil import WINDOWS29from psutil._compat import xrange30from psutil.tests import create_sockets31from psutil.tests import get_test_subprocess32from psutil.tests import HAS_CPU_AFFINITY33from psutil.tests import HAS_CPU_FREQ34from psutil.tests import HAS_ENVIRON35from psutil.tests import HAS_IONICE36from psutil.tests import HAS_MEMORY_MAPS37from psutil.tests import HAS_PROC_CPU_NUM38from psutil.tests import HAS_PROC_IO_COUNTERS39from psutil.tests import HAS_RLIMIT40from psutil.tests import HAS_SENSORS_BATTERY41from psutil.tests import HAS_SENSORS_FANS42from psutil.tests import HAS_SENSORS_TEMPERATURES43from psutil.tests import reap_children44from psutil.tests import run_test_module_by_name45from psutil.tests import safe_rmpath46from psutil.tests import skip_on_access_denied47from psutil.tests import TESTFN48from psutil.tests import TRAVIS49from psutil.tests import unittest50LOOPS = 100051MEMORY_TOLERANCE = 409652RETRY_FOR = 353SKIP_PYTHON_IMPL = True if TRAVIS else False54cext = psutil._psplatform.cext55thisproc = psutil.Process()56SKIP_PYTHON_IMPL = True if TRAVIS else False57# ===================================================================58# utils59# ===================================================================60def skip_if_linux():61    return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,62                           "worthless on LINUX (pure python)")63def bytes2human(n):64    """65    http://code.activestate.com/recipes/57801966    >>> bytes2human(10000)67    '9.8K'68    >>> bytes2human(100001221)69    '95.4M'70    """71    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')72    prefix = {}73    for i, s in enumerate(symbols):74        prefix[s] = 1 << (i + 1) * 1075    for s in reversed(symbols):76        if n >= prefix[s]:77            value = float(n) / prefix[s]78            return '%.2f%s' % (value, s)79    return "%sB" % n80class TestMemLeak(unittest.TestCase):81    """Base framework class which calls a function many times and82    produces a failure if process memory usage keeps increasing83    between calls or over time.84    """85    tolerance = MEMORY_TOLERANCE86    loops = LOOPS87    retry_for = RETRY_FOR88    def setUp(self):89        gc.collect()90    def execute(self, fun, *args, **kwargs):91        """Test a callable."""92        def call_many_times():93            for x in xrange(loops):94                self._call(fun, *args, **kwargs)95            del x96            gc.collect()97        tolerance = kwargs.pop('tolerance_', None) or self.tolerance98        loops = kwargs.pop('loops_', None) or self.loops99        retry_for = kwargs.pop('retry_for_', None) or self.retry_for100        # warm up101        for x in range(10):102            self._call(fun, *args, **kwargs)103        self.assertEqual(gc.garbage, [])104        self.assertEqual(threading.active_count(), 1)105        self.assertEqual(thisproc.children(), [])106        # Get 2 distinct memory samples, before and after having107        # called fun repeadetly.108        # step 1109        call_many_times()110        mem1 = self._get_mem()111        # step 2112        call_many_times()113        mem2 = self._get_mem()114        diff1 = mem2 - mem1115        if diff1 > tolerance:116            # This doesn't necessarily mean we have a leak yet.117            # At this point we assume that after having called the118            # function so many times the memory usage is stabilized119            # and if there are no leaks it should not increase120            # anymore.121            # Let's keep calling fun for 3 more seconds and fail if122            # we notice any difference.123            ncalls = 0124            stop_at = time.time() + retry_for125            while time.time() <= stop_at:126                self._call(fun, *args, **kwargs)127                ncalls += 1128            del stop_at129            gc.collect()130            mem3 = self._get_mem()131            diff2 = mem3 - mem2132            if mem3 > mem2:133                # failure134                extra_proc_mem = bytes2human(diff1 + diff2)135                print("exta proc mem: %s" % extra_proc_mem, file=sys.stderr)136                msg = "+%s after %s calls, +%s after another %s calls, "137                msg += "+%s extra proc mem"138                msg = msg % (139                    bytes2human(diff1), loops, bytes2human(diff2), ncalls,140                    extra_proc_mem)141                self.fail(msg)142    def execute_w_exc(self, exc, fun, *args, **kwargs):143        """Convenience function which tests a callable raising144        an exception.145        """146        def call():147            self.assertRaises(exc, fun, *args, **kwargs)148        self.execute(call)149    @staticmethod150    def _get_mem():151        # By using USS memory it seems it's less likely to bump152        # into false positives.153        if LINUX or WINDOWS or OSX:154            return thisproc.memory_full_info().uss155        else:156            return thisproc.memory_info().rss157    @staticmethod158    def _call(fun, *args, **kwargs):159        fun(*args, **kwargs)160# ===================================================================161# Process class162# ===================================================================163class TestProcessObjectLeaks(TestMemLeak):164    """Test leaks of Process class methods."""165    proc = thisproc166    def test_coverage(self):167        skip = set((168            "pid", "as_dict", "children", "cpu_affinity", "cpu_percent",169            "ionice", "is_running", "kill", "memory_info_ex", "memory_percent",170            "nice", "oneshot", "parent", "rlimit", "send_signal", "suspend",171            "terminate", "wait"))172        for name in dir(psutil.Process):173            if name.startswith('_'):174                continue175            if name in skip:176                continue177            self.assertTrue(hasattr(self, "test_" + name), msg=name)178    @skip_if_linux()179    def test_name(self):180        self.execute(self.proc.name)181    @skip_if_linux()182    def test_cmdline(self):183        self.execute(self.proc.cmdline)184    @skip_if_linux()185    def test_exe(self):186        self.execute(self.proc.exe)187    @skip_if_linux()188    def test_ppid(self):189        self.execute(self.proc.ppid)190    @unittest.skipIf(not POSIX, "POSIX only")191    @skip_if_linux()192    def test_uids(self):193        self.execute(self.proc.uids)194    @unittest.skipIf(not POSIX, "POSIX only")195    @skip_if_linux()196    def test_gids(self):197        self.execute(self.proc.gids)198    @skip_if_linux()199    def test_status(self):200        self.execute(self.proc.status)201    def test_nice_get(self):202        self.execute(self.proc.nice)203    def test_nice_set(self):204        niceness = thisproc.nice()205        self.execute(self.proc.nice, niceness)206    @unittest.skipIf(not HAS_IONICE, "not supported")207    def test_ionice_get(self):208        self.execute(self.proc.ionice)209    @unittest.skipIf(not HAS_IONICE, "not supported")210    def test_ionice_set(self):211        if WINDOWS:212            value = thisproc.ionice()213            self.execute(self.proc.ionice, value)214        else:215            self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE)216            fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)217            self.execute_w_exc(OSError, fun)218    @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported")219    @skip_if_linux()220    def test_io_counters(self):221        self.execute(self.proc.io_counters)222    @unittest.skipIf(POSIX, "worthless on POSIX")223    def test_username(self):224        self.execute(self.proc.username)225    @skip_if_linux()226    def test_create_time(self):227        self.execute(self.proc.create_time)228    @skip_if_linux()229    @skip_on_access_denied(only_if=OPENBSD)230    def test_num_threads(self):231        self.execute(self.proc.num_threads)232    @unittest.skipIf(not WINDOWS, "WINDOWS only")233    def test_num_handles(self):234        self.execute(self.proc.num_handles)235    @unittest.skipIf(not POSIX, "POSIX only")236    @skip_if_linux()237    def test_num_fds(self):238        self.execute(self.proc.num_fds)239    @skip_if_linux()240    def test_num_ctx_switches(self):241        self.execute(self.proc.num_ctx_switches)242    @skip_if_linux()243    @skip_on_access_denied(only_if=OPENBSD)244    def test_threads(self):245        self.execute(self.proc.threads)246    @skip_if_linux()247    def test_cpu_times(self):248        self.execute(self.proc.cpu_times)249    @skip_if_linux()250    @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")251    def test_cpu_num(self):252        self.execute(self.proc.cpu_num)253    @skip_if_linux()254    def test_memory_info(self):255        self.execute(self.proc.memory_info)256    @skip_if_linux()257    def test_memory_full_info(self):258        self.execute(self.proc.memory_full_info)259    @unittest.skipIf(not POSIX, "POSIX only")260    @skip_if_linux()261    def test_terminal(self):262        self.execute(self.proc.terminal)263    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,264                     "worthless on POSIX (pure python)")265    def test_resume(self):266        self.execute(self.proc.resume)267    @skip_if_linux()268    def test_cwd(self):269        self.execute(self.proc.cwd)270    @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")271    def test_cpu_affinity_get(self):272        self.execute(self.proc.cpu_affinity)273    @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")274    def test_cpu_affinity_set(self):275        affinity = thisproc.cpu_affinity()276        self.execute(self.proc.cpu_affinity, affinity)277        if not TRAVIS:278            self.execute_w_exc(ValueError, self.proc.cpu_affinity, [-1])279    @skip_if_linux()280    def test_open_files(self):281        safe_rmpath(TESTFN)  # needed after UNIX socket test has run282        with open(TESTFN, 'w'):283            self.execute(self.proc.open_files)284    # OSX implementation is unbelievably slow285    @unittest.skipIf(OSX, "too slow on OSX")286    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")287    @skip_if_linux()288    def test_memory_maps(self):289        self.execute(self.proc.memory_maps)290    @unittest.skipIf(not LINUX, "LINUX only")291    @unittest.skipIf(not HAS_RLIMIT, "not supported")292    def test_rlimit_get(self):293        self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE)294    @unittest.skipIf(not LINUX, "LINUX only")295    @unittest.skipIf(not HAS_RLIMIT, "not supported")296    def test_rlimit_set(self):297        limit = thisproc.rlimit(psutil.RLIMIT_NOFILE)298        self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE, limit)299        self.execute_w_exc(OSError, self.proc.rlimit, -1)300    @skip_if_linux()301    # Windows implementation is based on a single system-wide302    # function (tested later).303    @unittest.skipIf(WINDOWS, "worthless on WINDOWS")304    def test_connections(self):305        # TODO: UNIX sockets are temporarily implemented by parsing306        # 'pfiles' cmd  output; we don't want that part of the code to307        # be executed.308        with create_sockets():309            kind = 'inet' if SUNOS else 'all'310            self.execute(self.proc.connections, kind)311    @unittest.skipIf(not HAS_ENVIRON, "not supported")312    def test_environ(self):313        self.execute(self.proc.environ)314    @unittest.skipIf(not WINDOWS, "WINDOWS only")315    def test_proc_info(self):316        self.execute(cext.proc_info, os.getpid())317class TestTerminatedProcessLeaks(TestProcessObjectLeaks):318    """Repeat the tests above looking for leaks occurring when dealing319    with terminated processes raising NoSuchProcess exception.320    The C functions are still invoked but will follow different code321    paths. We'll check those code paths.322    """323    @classmethod324    def setUpClass(cls):325        super(TestTerminatedProcessLeaks, cls).setUpClass()326        p = get_test_subprocess()327        cls.proc = psutil.Process(p.pid)328        cls.proc.kill()329        cls.proc.wait()330    @classmethod331    def tearDownClass(cls):332        super(TestTerminatedProcessLeaks, cls).tearDownClass()333        reap_children()334    def _call(self, fun, *args, **kwargs):335        try:336            fun(*args, **kwargs)337        except psutil.NoSuchProcess:338            pass339    if WINDOWS:340        def test_kill(self):341            self.execute(self.proc.kill)342        def test_terminate(self):343            self.execute(self.proc.terminate)344        def test_suspend(self):345            self.execute(self.proc.suspend)346        def test_resume(self):347            self.execute(self.proc.resume)348        def test_wait(self):349            self.execute(self.proc.wait)350        def test_proc_info(self):351            # test dual implementation352            def call():353                try:354                    return cext.proc_info(self.proc.pid)355                except OSError as err:356                    if err.errno != errno.ESRCH:357                        raise358            self.execute(call)359# ===================================================================360# system APIs361# ===================================================================362class TestModuleFunctionsLeaks(TestMemLeak):363    """Test leaks of psutil module functions."""364    def test_coverage(self):365        skip = set((366            "version_info", "__version__", "process_iter", "wait_procs",367            "cpu_percent", "cpu_times_percent", "cpu_count"))368        for name in psutil.__all__:369            if not name.islower():370                continue371            if name in skip:372                continue373            self.assertTrue(hasattr(self, "test_" + name), msg=name)374    # --- cpu375    @skip_if_linux()376    def test_cpu_count_logical(self):377        self.execute(psutil.cpu_count, logical=True)378    @skip_if_linux()379    def test_cpu_count_physical(self):380        self.execute(psutil.cpu_count, logical=False)381    @skip_if_linux()382    def test_cpu_times(self):383        self.execute(psutil.cpu_times)384    @skip_if_linux()385    def test_per_cpu_times(self):386        self.execute(psutil.cpu_times, percpu=True)387    def test_cpu_stats(self):388        self.execute(psutil.cpu_stats)389    @skip_if_linux()390    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")391    def test_cpu_freq(self):392        self.execute(psutil.cpu_freq)393    # --- mem394    def test_virtual_memory(self):395        self.execute(psutil.virtual_memory)396    # TODO: remove this skip when this gets fixed397    @unittest.skipIf(SUNOS,398                     "worthless on SUNOS (uses a subprocess)")399    def test_swap_memory(self):400        self.execute(psutil.swap_memory)401    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,402                     "worthless on POSIX (pure python)")403    def test_pid_exists(self):404        self.execute(psutil.pid_exists, os.getpid())405    # --- disk406    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,407                     "worthless on POSIX (pure python)")408    def test_disk_usage(self):409        self.execute(psutil.disk_usage, '.')410    def test_disk_partitions(self):411        self.execute(psutil.disk_partitions)412    @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),413                     '/proc/diskstats not available on this Linux version')414    @skip_if_linux()415    def test_disk_io_counters(self):416        self.execute(psutil.disk_io_counters, nowrap=False)417    # --- proc418    @skip_if_linux()419    def test_pids(self):420        self.execute(psutil.pids)421    # --- net422    @skip_if_linux()423    def test_net_io_counters(self):424        self.execute(psutil.net_io_counters, nowrap=False)425    @unittest.skipIf(LINUX,426                     "worthless on Linux (pure python)")427    @unittest.skipIf(OSX and os.getuid() != 0, "need root access")428    def test_net_connections(self):429        with create_sockets():430            self.execute(psutil.net_connections)431    def test_net_if_addrs(self):432        # Note: verified that on Windows this was a false positive.433        self.execute(psutil.net_if_addrs,434                     tolerance_=80 * 1024 if WINDOWS else None)435    @unittest.skipIf(TRAVIS, "EPERM on travis")436    def test_net_if_stats(self):437        self.execute(psutil.net_if_stats)438    # --- sensors439    @skip_if_linux()440    @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")441    def test_sensors_battery(self):442        self.execute(psutil.sensors_battery)443    @skip_if_linux()444    @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")445    def test_sensors_temperatures(self):446        self.execute(psutil.sensors_temperatures)447    @skip_if_linux()448    @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")449    def test_sensors_fans(self):450        self.execute(psutil.sensors_fans)451    # --- others452    @skip_if_linux()453    def test_boot_time(self):454        self.execute(psutil.boot_time)455    # XXX - on Windows this produces a false positive456    @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")457    def test_users(self):458        self.execute(psutil.users)459    if WINDOWS:460        # --- win services461        def test_win_service_iter(self):462            self.execute(cext.winservice_enumerate)463        def test_win_service_get(self):464            pass465        def test_win_service_get_config(self):466            name = next(psutil.win_service_iter()).name()467            self.execute(cext.winservice_query_config, name)468        def test_win_service_get_status(self):469            name = next(psutil.win_service_iter()).name()470            self.execute(cext.winservice_query_status, name)471        def test_win_service_get_description(self):472            name = next(psutil.win_service_iter()).name()473            self.execute(cext.winservice_query_descr, name)474if __name__ == '__main__':...testutils.py
Source:testutils.py  
...44                else:45                    return f(self)46            return skipIf__47        return skipIf_48    def skip(msg):49        return skipIf(True, msg)50    def skipTest(self, msg):51        with warnings.catch_warnings():52            warnings.simplefilter('always', UserWarning)53            warnings.warn(msg)54        return55    unittest.TestCase.skipTest = skipTest56# Silence warnings caused by the stubbornness of the Python unittest57# maintainers58# http://bugs.python.org/issue942459if (not hasattr(unittest.TestCase, 'assert_')60        or unittest.TestCase.assertTrue is not unittest.TestCase.assertTrue):61    # mavaff...62    unittest.TestCase.assertTrue = unittest.TestCase.assertTrue...skip_list.py
Source:skip_list.py  
1"""2Based on "Skip Lists: A Probabilistic Alternative to Balanced Trees" by William Pugh3https://epaperpress.com/sortsearch/download/skiplist.pdf4"""5from __future__ import annotations6from random import random7from typing import Generic, Optional, TypeVar8KT = TypeVar("KT")9VT = TypeVar("VT")10class Node(Generic[KT, VT]):11    def __init__(self, key: KT, value: VT):12        self.key = key13        self.value = value14        self.forward: list[Node[KT, VT]] = []15    def __repr__(self) -> str:16        """17        :return: Visual representation of Node18        >>> node = Node("Key", 2)19        >>> repr(node)20        'Node(Key: 2)'21        """22        return f"Node({self.key}: {self.value})"23    @property24    def level(self) -> int:25        """26        :return: Number of forward references27        >>> node = Node("Key", 2)28        >>> node.level29        030        >>> node.forward.append(Node("Key2", 4))31        >>> node.level32        133        >>> node.forward.append(Node("Key3", 6))34        >>> node.level35        236        """37        return len(self.forward)38class SkipList(Generic[KT, VT]):39    def __init__(self, p: float = 0.5, max_level: int = 16):40        self.head = Node("root", None)41        self.level = 042        self.p = p43        self.max_level = max_level44    def __str__(self) -> str:45        """46        :return: Visual representation of SkipList47        >>> skip_list = SkipList()48        >>> print(skip_list)49        SkipList(level=0)50        >>> skip_list.insert("Key1", "Value")51        >>> print(skip_list) # doctest: +ELLIPSIS52        SkipList(level=...53        [root]--...54        [Key1]--Key1...55        None    *...56        >>> skip_list.insert("Key2", "OtherValue")57        >>> print(skip_list) # doctest: +ELLIPSIS58        SkipList(level=...59        [root]--...60        [Key1]--Key1...61        [Key2]--Key2...62        None    *...63        """64        items = list(self)65        if len(items) == 0:66            return f"SkipList(level={self.level})"67        label_size = max((len(str(item)) for item in items), default=4)68        label_size = max(label_size, 4) + 469        node = self.head70        lines = []71        forwards = node.forward.copy()72        lines.append(f"[{node.key}]".ljust(label_size, "-") + "* " * len(forwards))73        lines.append(" " * label_size + "| " * len(forwards))74        while len(node.forward) != 0:75            node = node.forward[0]76            lines.append(77                f"[{node.key}]".ljust(label_size, "-")78                + " ".join(str(n.key) if n.key == node.key else "|" for n in forwards)79            )80            lines.append(" " * label_size + "| " * len(forwards))81            forwards[: node.level] = node.forward82        lines.append("None".ljust(label_size) + "* " * len(forwards))83        return f"SkipList(level={self.level})\n" + "\n".join(lines)84    def __iter__(self):85        node = self.head86        while len(node.forward) != 0:87            yield node.forward[0].key88            node = node.forward[0]89    def random_level(self) -> int:90        """91        :return: Random level from [1, self.max_level] interval.92                 Higher values are less likely.93        """94        level = 195        while random() < self.p and level < self.max_level:96            level += 197        return level98    def _locate_node(self, key) -> tuple[Optional[Node[KT, VT]], list[Node[KT, VT]]]:99        """100        :param key: Searched key,101        :return: Tuple with searched node (or None if given key is not present)102                 and list of nodes that refer (if key is present) of should refer to103                 given node.104        """105        # Nodes with refer or should refer to output node106        update_vector = []107        node = self.head108        for i in reversed(range(self.level)):109            # i < node.level - When node level is lesser than `i` decrement `i`.110            # node.forward[i].key < key - Jumping to node with key value higher111            #                             or equal to searched key would result112            #                             in skipping searched key.113            while i < node.level and node.forward[i].key < key:114                node = node.forward[i]115            # Each leftmost node (relative to searched node) will potentially have to116            # be updated.117            update_vector.append(node)118        update_vector.reverse()  # Note that we were inserting values in reverse order.119        # len(node.forward) != 0 - If current node doesn't contain any further120        #                          references then searched key is not present.121        # node.forward[0].key == key - Next node key should be equal to search key122        #                              if key is present.123        if len(node.forward) != 0 and node.forward[0].key == key:124            return node.forward[0], update_vector125        else:126            return None, update_vector127    def delete(self, key: KT):128        """129        :param key: Key to remove from list.130        >>> skip_list = SkipList()131        >>> skip_list.insert(2, "Two")132        >>> skip_list.insert(1, "One")133        >>> skip_list.insert(3, "Three")134        >>> list(skip_list)135        [1, 2, 3]136        >>> skip_list.delete(2)137        >>> list(skip_list)138        [1, 3]139        """140        node, update_vector = self._locate_node(key)141        if node is not None:142            for i, update_node in enumerate(update_vector):143                # Remove or replace all references to removed node.144                if update_node.level > i and update_node.forward[i].key == key:145                    if node.level > i:146                        update_node.forward[i] = node.forward[i]147                    else:148                        update_node.forward = update_node.forward[:i]149    def insert(self, key: KT, value: VT):150        """151        :param key: Key to insert.152        :param value: Value associated with given key.153        >>> skip_list = SkipList()154        >>> skip_list.insert(2, "Two")155        >>> skip_list.find(2)156        'Two'157        >>> list(skip_list)158        [2]159        """160        node, update_vector = self._locate_node(key)161        if node is not None:162            node.value = value163        else:164            level = self.random_level()165            if level > self.level:166                # After level increase we have to add additional nodes to head.167                for i in range(self.level - 1, level):168                    update_vector.append(self.head)169                self.level = level170            new_node = Node(key, value)171            for i, update_node in enumerate(update_vector[:level]):172                # Change references to pass through new node.173                if update_node.level > i:174                    new_node.forward.append(update_node.forward[i])175                if update_node.level < i + 1:176                    update_node.forward.append(new_node)177                else:178                    update_node.forward[i] = new_node179    def find(self, key: VT) -> Optional[VT]:180        """181        :param key: Search key.182        :return: Value associated with given key or None if given key is not present.183        >>> skip_list = SkipList()184        >>> skip_list.find(2)185        >>> skip_list.insert(2, "Two")186        >>> skip_list.find(2)187        'Two'188        >>> skip_list.insert(2, "Three")189        >>> skip_list.find(2)190        'Three'191        """192        node, _ = self._locate_node(key)193        if node is not None:194            return node.value195        return None196def test_insert():197    skip_list = SkipList()198    skip_list.insert("Key1", 3)199    skip_list.insert("Key2", 12)200    skip_list.insert("Key3", 41)201    skip_list.insert("Key4", -19)202    node = skip_list.head203    all_values = {}204    while node.level != 0:205        node = node.forward[0]206        all_values[node.key] = node.value207    assert len(all_values) == 4208    assert all_values["Key1"] == 3209    assert all_values["Key2"] == 12210    assert all_values["Key3"] == 41211    assert all_values["Key4"] == -19212def test_insert_overrides_existing_value():213    skip_list = SkipList()214    skip_list.insert("Key1", 10)215    skip_list.insert("Key1", 12)216    skip_list.insert("Key5", 7)217    skip_list.insert("Key7", 10)218    skip_list.insert("Key10", 5)219    skip_list.insert("Key7", 7)220    skip_list.insert("Key5", 5)221    skip_list.insert("Key10", 10)222    node = skip_list.head223    all_values = {}224    while node.level != 0:225        node = node.forward[0]226        all_values[node.key] = node.value227    if len(all_values) != 4:228        print()229    assert len(all_values) == 4230    assert all_values["Key1"] == 12231    assert all_values["Key7"] == 7232    assert all_values["Key5"] == 5233    assert all_values["Key10"] == 10234def test_searching_empty_list_returns_none():235    skip_list = SkipList()236    assert skip_list.find("Some key") is None237def test_search():238    skip_list = SkipList()239    skip_list.insert("Key2", 20)240    assert skip_list.find("Key2") == 20241    skip_list.insert("Some Key", 10)242    skip_list.insert("Key2", 8)243    skip_list.insert("V", 13)244    assert skip_list.find("Y") is None245    assert skip_list.find("Key2") == 8246    assert skip_list.find("Some Key") == 10247    assert skip_list.find("V") == 13248def test_deleting_item_from_empty_list_do_nothing():249    skip_list = SkipList()250    skip_list.delete("Some key")251    assert len(skip_list.head.forward) == 0252def test_deleted_items_are_not_founded_by_find_method():253    skip_list = SkipList()254    skip_list.insert("Key1", 12)255    skip_list.insert("V", 13)256    skip_list.insert("X", 14)257    skip_list.insert("Key2", 15)258    skip_list.delete("V")259    skip_list.delete("Key2")260    assert skip_list.find("V") is None261    assert skip_list.find("Key2") is None262def test_delete_removes_only_given_key():263    skip_list = SkipList()264    skip_list.insert("Key1", 12)265    skip_list.insert("V", 13)266    skip_list.insert("X", 14)267    skip_list.insert("Key2", 15)268    skip_list.delete("V")269    assert skip_list.find("V") is None270    assert skip_list.find("X") == 14271    assert skip_list.find("Key1") == 12272    assert skip_list.find("Key2") == 15273    skip_list.delete("X")274    assert skip_list.find("V") is None275    assert skip_list.find("X") is None276    assert skip_list.find("Key1") == 12277    assert skip_list.find("Key2") == 15278    skip_list.delete("Key1")279    assert skip_list.find("V") is None280    assert skip_list.find("X") is None281    assert skip_list.find("Key1") is None282    assert skip_list.find("Key2") == 15283    skip_list.delete("Key2")284    assert skip_list.find("V") is None285    assert skip_list.find("X") is None286    assert skip_list.find("Key1") is None287    assert skip_list.find("Key2") is None288def test_delete_doesnt_leave_dead_nodes():289    skip_list = SkipList()290    skip_list.insert("Key1", 12)291    skip_list.insert("V", 13)292    skip_list.insert("X", 142)293    skip_list.insert("Key2", 15)294    skip_list.delete("X")295    def traverse_keys(node):296        yield node.key297        for forward_node in node.forward:298            yield from traverse_keys(forward_node)299    assert len(set(traverse_keys(skip_list.head))) == 4300def test_iter_always_yields_sorted_values():301    def is_sorted(lst):302        for item, next_item in zip(lst, lst[1:]):303            if next_item < item:304                return False305        return True306    skip_list = SkipList()307    for i in range(10):308        skip_list.insert(i, i)309    assert is_sorted(list(skip_list))310    skip_list.delete(5)311    skip_list.delete(8)312    skip_list.delete(2)313    assert is_sorted(list(skip_list))314    skip_list.insert(-12, -12)315    skip_list.insert(77, 77)316    assert is_sorted(list(skip_list))317def pytests():318    for i in range(100):319        # Repeat test 100 times due to the probabilistic nature of skip list320        # random values == random bugs321        test_insert()322        test_insert_overrides_existing_value()323        test_searching_empty_list_returns_none()324        test_search()325        test_deleting_item_from_empty_list_do_nothing()326        test_deleted_items_are_not_founded_by_find_method()327        test_delete_removes_only_given_key()328        test_delete_doesnt_leave_dead_nodes()329        test_iter_always_yields_sorted_values()330def main():331    """332    >>> pytests()333    """334    skip_list = SkipList()335    skip_list.insert(2, "2")336    skip_list.insert(4, "4")337    skip_list.insert(6, "4")338    skip_list.insert(4, "5")339    skip_list.insert(8, "4")340    skip_list.insert(9, "4")341    skip_list.delete(4)342    print(skip_list)343if __name__ == "__main__":...test_skipping.py
Source:test_skipping.py  
...28                    (unittest.skipIf, True, False))29        for deco, do_skip, dont_skip in op_table:30            class Foo(unittest.TestCase):31                @deco(do_skip, "testing")32                def test_skip(self): pass33                @deco(dont_skip, "testing")34                def test_dont_skip(self): pass35            test_do_skip = Foo("test_skip")36            test_dont_skip = Foo("test_dont_skip")37            suite = unittest.TestSuite([test_do_skip, test_dont_skip])38            events = []39            result = LoggingResult(events)40            suite.run(result)41            self.assertEqual(len(result.skipped), 1)42            expected = ['startTest', 'addSkip', 'stopTest',43                        'startTest', 'addSuccess', 'stopTest']44            self.assertEqual(events, expected)45            self.assertEqual(result.testsRun, 2)46            self.assertEqual(result.skipped, [(test_do_skip, "testing")])47            self.assertTrue(result.wasSuccessful())48    def test_skip_class(self):49        @unittest.skip("testing")50        class Foo(unittest.TestCase):51            def test_1(self):52                record.append(1)53        record = []54        result = unittest.TestResult()55        test = Foo("test_1")56        suite = unittest.TestSuite([test])57        suite.run(result)58        self.assertEqual(result.skipped, [(test, "testing")])59        self.assertEqual(record, [])60    def test_skip_non_unittest_class(self):61        @unittest.skip("testing")62        class Mixin:63            def test_1(self):64                record.append(1)65        class Foo(Mixin, unittest.TestCase):66            pass67        record = []68        result = unittest.TestResult()69        test = Foo("test_1")70        suite = unittest.TestSuite([test])71        suite.run(result)72        self.assertEqual(result.skipped, [(test, "testing")])73        self.assertEqual(record, [])74    def test_expected_failure(self):75        class Foo(unittest.TestCase):76            @unittest.expectedFailure77            def test_die(self):78                self.fail("help me!")79        events = []80        result = LoggingResult(events)81        test = Foo("test_die")82        test.run(result)83        self.assertEqual(events,84                         ['startTest', 'addExpectedFailure', 'stopTest'])85        self.assertEqual(result.expectedFailures[0][0], test)86        self.assertTrue(result.wasSuccessful())87    def test_unexpected_success(self):88        class Foo(unittest.TestCase):89            @unittest.expectedFailure90            def test_die(self):91                pass92        events = []93        result = LoggingResult(events)94        test = Foo("test_die")95        test.run(result)96        self.assertEqual(events,97                         ['startTest', 'addUnexpectedSuccess', 'stopTest'])98        self.assertFalse(result.failures)99        self.assertEqual(result.unexpectedSuccesses, [test])100        self.assertTrue(result.wasSuccessful())101    def test_skip_doesnt_run_setup(self):102        class Foo(unittest.TestCase):103            wasSetUp = False104            wasTornDown = False105            def setUp(self):106                Foo.wasSetUp = True107            def tornDown(self):108                Foo.wasTornDown = True109            @unittest.skip('testing')110            def test_1(self):111                pass112        result = unittest.TestResult()113        test = Foo("test_1")114        suite = unittest.TestSuite([test])115        suite.run(result)116        self.assertEqual(result.skipped, [(test, "testing")])117        self.assertFalse(Foo.wasSetUp)118        self.assertFalse(Foo.wasTornDown)119    def test_decorated_skip(self):120        def decorator(func):121            def inner(*a):122                return func(*a)123            return inner124        class Foo(unittest.TestCase):125            @decorator126            @unittest.skip('testing')127            def test_1(self):128                pass129        result = unittest.TestResult()130        test = Foo("test_1")131        suite = unittest.TestSuite([test])132        suite.run(result)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
