Best Python code snippet using yandex-tank
lock_tests.py
Source:lock_tests.py  
...58        lock = self.locktype()59        del lock60    def test_acquire_destroy(self):61        lock = self.locktype()62        lock.acquire()63        del lock64    def test_acquire_release(self):65        lock = self.locktype()66        lock.acquire()67        lock.release()68        del lock69    def test_try_acquire(self):70        lock = self.locktype()71        self.assertTrue(lock.acquire(False))72        lock.release()73    def test_try_acquire_contended(self):74        lock = self.locktype()75        lock.acquire()76        result = []77        def f():78            result.append(lock.acquire(False))79        Bunch(f, 1).wait_for_finished()80        self.assertFalse(result[0])81        lock.release()82    def test_acquire_contended(self):83        lock = self.locktype()84        lock.acquire()85        N = 586        def f():87            lock.acquire()88            lock.release()89        b = Bunch(f, N)90        b.wait_for_started()91        _wait()92        self.assertEqual(len(b.finished), 0)93        lock.release()94        b.wait_for_finished()95        self.assertEqual(len(b.finished), N)96    def test_with(self):97        lock = self.locktype()98        def f():99            lock.acquire()100            lock.release()101        def _with(err=None):102            with lock:103                if err is not None:104                    raise err105        _with()106        # Check the lock is unacquired107        Bunch(f, 1).wait_for_finished()108        self.assertRaises(TypeError, _with, TypeError)109        # Check the lock is unacquired110        Bunch(f, 1).wait_for_finished()111    def test_thread_leak(self):112        # The lock shouldn't leak a Thread instance when used from a foreign113        # (non-threading) thread.114        lock = self.locktype()115        def f():116            lock.acquire()117            lock.release()118        n = len(threading.enumerate())119        # We run many threads in the hope that existing threads ids won't120        # be recycled.121        Bunch(f, 15).wait_for_finished()122        self.assertEqual(n, len(threading.enumerate()))123class LockTests(BaseLockTests):124    """125    Tests for non-recursive, weak locks126    (which can be acquired and released from different threads).127    """128    def test_reacquire(self):129        # Lock needs to be released before re-acquiring.130        lock = self.locktype()131        phase = []132        def f():133            lock.acquire()134            phase.append(None)135            lock.acquire()136            phase.append(None)137        start_new_thread(f, ())138        while len(phase) == 0:139            _wait()140        _wait()141        self.assertEqual(len(phase), 1)142        lock.release()143        while len(phase) == 1:144            _wait()145        self.assertEqual(len(phase), 2)146    def test_different_thread(self):147        # Lock can be released from a different thread.148        lock = self.locktype()149        lock.acquire()150        def f():151            lock.release()152        b = Bunch(f, 1)153        b.wait_for_finished()154        lock.acquire()155        lock.release()156class RLockTests(BaseLockTests):157    """158    Tests for recursive locks.159    """160    def test_reacquire(self):161        lock = self.locktype()162        lock.acquire()163        lock.acquire()164        lock.release()165        lock.acquire()166        lock.release()167        lock.release()168    def test_release_unacquired(self):169        # Cannot release an unacquired lock170        lock = self.locktype()171        self.assertRaises(RuntimeError, lock.release)172        lock.acquire()173        lock.acquire()174        lock.release()175        lock.acquire()176        lock.release()177        lock.release()178        self.assertRaises(RuntimeError, lock.release)179    def test_different_thread(self):180        # Cannot release from a different thread181        lock = self.locktype()182        def f():183            lock.acquire()184        b = Bunch(f, 1, True)185        try:186            self.assertRaises(RuntimeError, lock.release)187        finally:188            b.do_finish()189    def test__is_owned(self):190        lock = self.locktype()191        self.assertFalse(lock._is_owned())192        lock.acquire()193        self.assertTrue(lock._is_owned())194        lock.acquire()195        self.assertTrue(lock._is_owned())196        result = []197        def f():198            result.append(lock._is_owned())199        Bunch(f, 1).wait_for_finished()200        self.assertFalse(result[0])201        lock.release()202        self.assertTrue(lock._is_owned())203        lock.release()204        self.assertFalse(lock._is_owned())205class EventTests(BaseTestCase):206    """207    Tests for Event objects.208    """209    def test_is_set(self):210        evt = self.eventtype()211        self.assertFalse(evt.is_set())212        evt.set()213        self.assertTrue(evt.is_set())214        evt.set()215        self.assertTrue(evt.is_set())216        evt.clear()217        self.assertFalse(evt.is_set())218        evt.clear()219        self.assertFalse(evt.is_set())220    def _check_notify(self, evt):221        # All threads get notified222        N = 5223        results1 = []224        results2 = []225        def f():226            results1.append(evt.wait())227            results2.append(evt.wait())228        b = Bunch(f, N)229        b.wait_for_started()230        _wait()231        self.assertEqual(len(results1), 0)232        evt.set()233        b.wait_for_finished()234        self.assertEqual(results1, [True] * N)235        self.assertEqual(results2, [True] * N)236    def test_notify(self):237        evt = self.eventtype()238        self._check_notify(evt)239        # Another time, after an explicit clear()240        evt.set()241        evt.clear()242        self._check_notify(evt)243    def test_timeout(self):244        evt = self.eventtype()245        results1 = []246        results2 = []247        N = 5248        def f():249            results1.append(evt.wait(0.0))250            t1 = time.time()251            r = evt.wait(0.2)252            t2 = time.time()253            results2.append((r, t2 - t1))254        Bunch(f, N).wait_for_finished()255        self.assertEqual(results1, [False] * N)256        for r, dt in results2:257            self.assertFalse(r)258            self.assertTrue(dt >= 0.2, dt)259        # The event is set260        results1 = []261        results2 = []262        evt.set()263        Bunch(f, N).wait_for_finished()264        self.assertEqual(results1, [True] * N)265        for r, dt in results2:266            self.assertTrue(r)267class ConditionTests(BaseTestCase):268    """269    Tests for condition variables.270    """271    def test_acquire(self):272        cond = self.condtype()273        # Be default we have an RLock: the condition can be acquired multiple274        # times.275        cond.acquire()276        cond.acquire()277        cond.release()278        cond.release()279        lock = threading.Lock()280        cond = self.condtype(lock)281        cond.acquire()282        self.assertFalse(lock.acquire(False))283        cond.release()284        self.assertTrue(lock.acquire(False))285        self.assertFalse(cond.acquire(False))286        lock.release()287        with cond:288            self.assertFalse(lock.acquire(False))289    def test_unacquired_wait(self):290        cond = self.condtype()291        self.assertRaises(RuntimeError, cond.wait)292    def test_unacquired_notify(self):293        cond = self.condtype()294        self.assertRaises(RuntimeError, cond.notify)295    def _check_notify(self, cond):296        N = 5297        results1 = []298        results2 = []299        phase_num = 0300        def f():301            cond.acquire()302            cond.wait()303            cond.release()304            results1.append(phase_num)305            cond.acquire()306            cond.wait()307            cond.release()308            results2.append(phase_num)309        b = Bunch(f, N)310        b.wait_for_started()311        _wait()312        self.assertEqual(results1, [])313        # Notify 3 threads at first314        cond.acquire()315        cond.notify(3)316        _wait()317        phase_num = 1318        cond.release()319        while len(results1) < 3:320            _wait()321        self.assertEqual(results1, [1] * 3)322        self.assertEqual(results2, [])323        # Notify 5 threads: they might be in their first or second wait324        cond.acquire()325        cond.notify(5)326        _wait()327        phase_num = 2328        cond.release()329        while len(results1) + len(results2) < 8:330            _wait()331        self.assertEqual(results1, [1] * 3 + [2] * 2)332        self.assertEqual(results2, [2] * 3)333        # Notify all threads: they are all in their second wait334        cond.acquire()335        cond.notify_all()336        _wait()337        phase_num = 3338        cond.release()339        while len(results2) < 5:340            _wait()341        self.assertEqual(results1, [1] * 3 + [2] * 2)342        self.assertEqual(results2, [2] * 3 + [3] * 2)343        b.wait_for_finished()344    def test_notify(self):345        cond = self.condtype()346        self._check_notify(cond)347        # A second time, to check internal state is still ok.348        self._check_notify(cond)349    def test_timeout(self):350        cond = self.condtype()351        results = []352        N = 5353        def f():354            cond.acquire()355            t1 = time.time()356            cond.wait(0.2)357            t2 = time.time()358            cond.release()359            results.append(t2 - t1)360        Bunch(f, N).wait_for_finished()361        self.assertEqual(len(results), 5)362        for dt in results:363            self.assertTrue(dt >= 0.2, dt)364class BaseSemaphoreTests(BaseTestCase):365    """366    Common tests for {bounded, unbounded} semaphore objects.367    """368    def test_constructor(self):369        self.assertRaises(ValueError, self.semtype, value = -1)370        self.assertRaises(ValueError, self.semtype, value = -sys.maxint)371    def test_acquire(self):372        sem = self.semtype(1)373        sem.acquire()374        sem.release()375        sem = self.semtype(2)376        sem.acquire()377        sem.acquire()378        sem.release()379        sem.release()380    def test_acquire_destroy(self):381        sem = self.semtype()382        sem.acquire()383        del sem384    def test_acquire_contended(self):385        sem = self.semtype(7)386        sem.acquire()387        N = 10388        results1 = []389        results2 = []390        phase_num = 0391        def f():392            sem.acquire()393            results1.append(phase_num)394            sem.acquire()395            results2.append(phase_num)396        b = Bunch(f, 10)397        b.wait_for_started()398        while len(results1) + len(results2) < 6:399            _wait()400        self.assertEqual(results1 + results2, [0] * 6)401        phase_num = 1402        for i in range(7):403            sem.release()404        while len(results1) + len(results2) < 13:405            _wait()406        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)407        phase_num = 2408        for i in range(6):409            sem.release()410        while len(results1) + len(results2) < 19:411            _wait()412        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)413        # The semaphore is still locked414        self.assertFalse(sem.acquire(False))415        # Final release, to let the last thread finish416        sem.release()417        b.wait_for_finished()418    def test_try_acquire(self):419        sem = self.semtype(2)420        self.assertTrue(sem.acquire(False))421        self.assertTrue(sem.acquire(False))422        self.assertFalse(sem.acquire(False))423        sem.release()424        self.assertTrue(sem.acquire(False))425    def test_try_acquire_contended(self):426        sem = self.semtype(4)427        sem.acquire()428        results = []429        def f():430            results.append(sem.acquire(False))431            results.append(sem.acquire(False))432        Bunch(f, 5).wait_for_finished()433        # There can be a thread switch between acquiring the semaphore and434        # appending the result, therefore results will not necessarily be435        # ordered.436        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )437    def test_default_value(self):438        # The default initial value is 1.439        sem = self.semtype()440        sem.acquire()441        def f():442            sem.acquire()443            sem.release()444        b = Bunch(f, 1)445        b.wait_for_started()446        _wait()447        self.assertFalse(b.finished)448        sem.release()449        b.wait_for_finished()450    def test_with(self):451        sem = self.semtype(2)452        def _with(err=None):453            with sem:454                self.assertTrue(sem.acquire(False))455                sem.release()456                with sem:457                    self.assertFalse(sem.acquire(False))458                    if err:459                        raise err460        _with()461        self.assertTrue(sem.acquire(False))462        sem.release()463        self.assertRaises(TypeError, _with, TypeError)464        self.assertTrue(sem.acquire(False))465        sem.release()466class SemaphoreTests(BaseSemaphoreTests):467    """468    Tests for unbounded semaphores.469    """470    def test_release_unacquired(self):471        # Unbounded releases are allowed and increment the semaphore's value472        sem = self.semtype(1)473        sem.release()474        sem.acquire()475        sem.acquire()476        sem.release()477class BoundedSemaphoreTests(BaseSemaphoreTests):478    """479    Tests for bounded semaphores.480    """481    def test_release_unacquired(self):482        # Cannot go past the initial value483        sem = self.semtype()484        self.assertRaises(ValueError, sem.release)485        sem.acquire()486        sem.release()...lockable_directory.py
Source:lockable_directory.py  
...20        self.temp_dir = tempfile.TemporaryDirectory(prefix="litani-test")21        self.temp_dir_p = pathlib.Path(self.temp_dir.name)22    def tearDown(self):23        self.temp_dir.cleanup()24    def test_cannot_initially_acquire(self):25        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)26        self.assertFalse(lock_dir.acquire())27    def test_multiproc_cannot_initially_acquire(self):28        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)29        self.assertFalse(lock_dir.acquire())30        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)31        self.assertFalse(lock_dir.acquire())32    def test_can_acquire_after_release(self):33        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)34        lock_dir.release()35        self.assertTrue(lock_dir.acquire())36        lock_dir.release()37        self.assertTrue(lock_dir.acquire())38    def test_multiproc_can_acquire_after_release(self):39        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)40        lock_dir.release()41        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)42        self.assertTrue(lock_dir_2.acquire())43    def test_no_double_acquire(self):44        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)45        lock_dir.release()46        self.assertTrue(lock_dir.acquire())47        self.assertFalse(lock_dir.acquire())48    def test_multiproc_no_double_acquire(self):49        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)50        lock_dir.release()51        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)52        self.assertTrue(lock_dir.acquire())53        self.assertFalse(lock_dir_2.acquire())54    def test_repeat_no_double_acquire(self):55        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)56        lock_dir.release()57        self.assertTrue(lock_dir.acquire())58        self.assertFalse(lock_dir.acquire())59        lock_dir.release()60        self.assertTrue(lock_dir.acquire())61        self.assertFalse(lock_dir.acquire())62    def test_try_acquire(self):63        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)64        lock_dir.release()65        with lock_dir.try_acquire():66            pass # No exception67    def test_no_double_try_acquire(self):68        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)69        lock_dir.release()70        with lock_dir.try_acquire():71            with self.assertRaises(lib.litani.AcquisitionFailed):72                with lock_dir.try_acquire():73                    pass74    def test_multiproc_no_double_try_acquire(self):75        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)76        lock_dir.release()77        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)78        with lock_dir.try_acquire():79            with self.assertRaises(lib.litani.AcquisitionFailed):80                with lock_dir_2.try_acquire():81                    pass82    def test_no_try_acquire_acquire(self):83        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)84        lock_dir.release()85        with lock_dir.try_acquire():86            self.assertFalse(lock_dir.acquire())87    def test_multiproc_no_try_acquire_acquire(self):88        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)89        lock_dir.release()90        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)91        with lock_dir.try_acquire():92            self.assertFalse(lock_dir_2.acquire())93    def test_no_acquire_try_acquire(self):94        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)95        with self.assertRaises(lib.litani.AcquisitionFailed):96            with lock_dir.try_acquire():97                pass98    def test_multiproc_no_acquire_try_acquire(self):99        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)100        lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)101        with self.assertRaises(lib.litani.AcquisitionFailed):102            with lock_dir_2.try_acquire():103                pass104    def test_context_manager_releases(self):105        lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)106        lock_dir.release()107        with lock_dir.try_acquire():108            pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
