Best Python code snippet using yandex-tank
test_threading.py
Source:test_threading.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3from time import strftime, sleep4from threading import Lock, Thread, current_thread5from collections import deque6_log_lock = Lock()7def log(s):8    with _log_lock:9        print '[{}] [{}] {}'.format(10            strftime('%Y-%m-%d %H:%M:%S'),11            current_thread().name,12            s13        )14class WaitingQueue(object):15    def __init__(self, iterable=()):16        # only single thread can access its attrs at the same time17        self._lock = Lock()18        # the task queue19        self._q = deque(iterable)20        # yeah, it's the condition mechanism, better than the built-in one21        self._wait_lock_q = deque()22        # if `take` when `_q` is empty, wait it or raise IndexError23        self._waiting = True24    def put(self, x):25        with self._lock:26            # simply append it27            self._q.append(x)28            # notify a waiting take-thread29            try:30                wait_lock = self._wait_lock_q.popleft()31            except IndexError:32                pass33            else:34                wait_lock.release()35    def take(self):36        with self._lock:37            # pop38            # case 1: simply return it39            # case 2: wait for a `put`40            # case 3: raise IndexError41            try:42                return self._q.popleft()43            except IndexError:44                if not self._waiting:45                    raise46            # create an unique wait lock47            wait_lock = Lock()48            wait_lock.acquire()49            self._wait_lock_q.append(wait_lock)50        # acquire it secondly to wait and be notified51        wait_lock.acquire()52        return self.take()53    def stop_waiting(self):54        with self._lock:55            # let the futher take-thread raise56            self._waiting = False57            # notify all waiting take-threads58            while self._wait_lock_q:59                wait_lock = self._wait_lock_q.popleft()60                wait_lock.release()61if __name__ == '__main__':62    def consume():63        while 1:64            # try to take65            log('Trying to take ...')66            try:67                x = q.take()68            except IndexError:69                log('Failed to take. Stop.'.format(x))70                return71            else:72                log('Took {}.'.format(x))73            # consume74            log('Consuming {}...'.format(x))75            sleep(0.1)76            log('Consumed {}.'.format(x))77    q = WaitingQueue(range(4))78    for t in [Thread(target=consume) for i in range(3)]:79        t.start()80    # test to add tasks asyncly81    for x in 'ABC':82        log('Producing {}...'.format(x))83        q.put(x)84        log('Produced {}.'.format(x))85    # test to add tasks after queue empties86    sleep(0.8)87    for x in 'MNOP':88        log('Producing {}...'.format(x))89        q.put(x)90        log('Produced {}.'.format(x))91    # test to add tasks after stop waiting92    q.stop_waiting()93    sleep(0.5)94    for x in 'ST':95        log('Producing {}...'.format(x))96        q.put(x)97        log('Produced {}.'.format(x))...test_thread.py
Source:test_thread.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3from time import strftime, sleep4from thread import allocate_lock, start_new_thread5from collections import deque6_log_lock = allocate_lock()7def log(s):8    with _log_lock:9        print '[{}] {}'.format(10            strftime('%Y-%m-%d %H:%M:%S'),11            s12        )13class WaitingQueue(object):14    def __init__(self, iterable=()):15        # only single thread can access its attrs at the same time16        self._lock = allocate_lock()17        # the task queue18        self._q = deque(iterable)19        # yeah, it's the condition mechanism, better than the built-in one20        self._wait_lock_q = deque()21        # if `take` when `_q` is empty, wait it or raise IndexError22        self._waiting = True23    def put(self, x):24        with self._lock:25            # simply append it26            self._q.append(x)27            # notify a waiting take-thread28            try:29                wait_lock = self._wait_lock_q.popleft()30            except IndexError:31                pass32            else:33                wait_lock.release()34    def take(self):35        with self._lock:36            # pop37            # case 1: simply return it38            # case 2: wait for a `put`39            # case 3: raise IndexError40            try:41                return self._q.popleft()42            except IndexError:43                if not self._waiting:44                    raise45            # create an unique wait lock46            wait_lock = allocate_lock()47            wait_lock.acquire()48            self._wait_lock_q.append(wait_lock)49        # acquire it secondly to wait and be notified50        wait_lock.acquire()51        return self.take()52    def stop_waiting(self):53        with self._lock:54            # let the futher take-thread raise55            self._waiting = False56            # notify all waiting take-threads57            while self._wait_lock_q:58                wait_lock = self._wait_lock_q.popleft()59                wait_lock.release()60if __name__ == '__main__':61    def consume():62        while 1:63            # try to take64            log('Trying to take ...')65            try:66                x = q.take()67            except IndexError:68                log('Failed to take. Stop.'.format(x))69                return70            else:71                log('Took {}.'.format(x))72            # consume73            log('Consuming {}...'.format(x))74            sleep(0.1)75            log('Consumed {}.'.format(x))76    q = WaitingQueue(range(4))77    for i in range(3):78        start_new_thread(consume, tuple())79    # test to add tasks asyncly80    for x in 'ABC':81        log('Producing {}...'.format(x))82        q.put(x)83        log('Produced {}.'.format(x))84    # test to add tasks after queue empties85    sleep(0.8)86    for x in 'MNOP':87        log('Producing {}...'.format(x))88        q.put(x)89        log('Produced {}.'.format(x))90    # test to add tasks after stop waiting91    q.stop_waiting()92    sleep(0.5)93    for x in 'ST':94        log('Producing {}...'.format(x))95        q.put(x)96        log('Produced {}.'.format(x))...yaml_io.py
Source:yaml_io.py  
1import yaml2from os import remove3from os.path import isfile4from datetime import datetime5from utils.shell_io import byte_style6def _wait(lfile):7    if isfile(lfile):8        msg = 'Wait for '9        msg += byte_style(f"'{lfile}'", '3')10        msg += '\n  (Locked at '11        with open(lfile) as fr:12            for line in fr:13                msg += line.split('.')[0]14        msg += ') Unlock? [Y or any key to exit] '15        if input(msg) != 'Y':16            # print('\nexited')17            exit()18        remove(lfile)19def _block(lfile):20    with open(lfile, 'w') as fw:21        fw.write(str(datetime.now()))22def _unblock(lfile):23    if isfile(lfile):24        remove(lfile)25        return True26    return False27def save_yaml(status, mfile, lfile, wait_lock = True):28    if lfile:29        if wait_lock:30            _wait(lfile)31            _block(lfile)32        finished = False33        do_exit = None34        while not finished:35            try:36                with open(mfile, 'w') as fw:37                    fw.write(f'# {datetime.now()}\n')38                    yaml.dump(status, fw, default_flow_style = False)39                finished = True40            except KeyboardInterrupt as e:41                do_exit = e42                print('suppress', e, 'for saving', mfile)43        # release44        if wait_lock:45            _unblock(lfile)46            if do_exit is not None:47                raise do_exit48    else:49        with open(mfile, 'a+') as fw:50            fw.write(f'# {datetime.now()}\n')51            yaml.dump(status, fw, default_flow_style = False)52    return True53def load_yaml(mfile, lfile, block = False, wait_lock = True):54    if isfile(mfile):55        if wait_lock:56            _wait(lfile)57            if block:58                _block(lfile)59    else:60        save_yaml({}, mfile, lfile)61    with open(mfile, 'r') as fr:62        status = yaml.load(fr, Loader = yaml.FullLoader)63    if not status:64        status = {}65    if wait_lock and block:66        return status, lambda : _unblock(lfile)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
