Best Python code snippet using locust
test_worker.py
Source:test_worker.py  
1import os2import logging3import zipfile4import time5import threading6import pytest7try:8    # Python 39    import queue10    from urllib.error import HTTPError, URLError11except ImportError:12    # Python 213    from urllib2 import HTTPError, URLError14    import Queue as queue15import gmaltcli.worker as worker16class TestSafeCounter(object):17    def test_increment_one(self):18        counter = worker.SafeCounter()19        counter.max = 10020        return_value = counter.increment()21        assert return_value[0] == 122        assert return_value[1] == 10023        return_value = counter.increment()24        assert return_value[0] == 225        assert return_value[1] == 10026    def test_increment_five_start_ten(self):27        counter = worker.SafeCounter(start=10, incr=5)28        counter.max = 10029        return_value = counter.increment()30        assert return_value[0] == 1531        assert return_value[1] == 10032        return_value = counter.increment()33        assert return_value[0] == 2034        assert return_value[1] == 10035    def test_get(self):36        counter = worker.SafeCounter()37        counter.max = 10038        counter.increment()39        counter.increment()40        counter.increment()41        assert counter.get() == 342class FalseWorker(worker.Worker):43    def __init__(self, id_, queue_obj, counter, stop_event, processed, sleep=0):44        super(FalseWorker, self).__init__(id_, queue_obj, counter, stop_event)45        processed[id_] = []46        self.processed = processed47        self.sleep = sleep48    def process(self, queue_item, counter_info):49        self.processed[self.id].append(queue_item)50        if self.sleep:51            time.sleep(self.sleep)52    def _on_end(self):53        setattr(self, 'on_end_called', True)54class ErrorWorker(FalseWorker):55    def process(self, queue_item, counter_info):56        if queue_item == 80 or counter_info[0] == 80:57            raise Exception(58                'Exception on 80th item')59        else:60            super(ErrorWorker, self).process(queue_item, counter_info)61class TestWorkerPool(object):62    def setup_method(self, func_method):63        self.processed = {}64        self.pool = worker.WorkerPool(FalseWorker, 5, self.processed, sleep=0.001)65        self.pool.fill(['item' + str(item) for item in range(1, 100)])66    def test__init__(self):67        assert len(self.pool.workers) == 568        assert all([isinstance(item, FalseWorker) for item in self.pool.workers])69        assert all([id(self.processed) == id(item.processed) for item in self.pool.workers])70    def test_fill(self):71        assert self.pool.queue.qsize() == 9972        assert self.pool.counter.max == 9973    def test_start(self):74        self.pool.start()75        assert len(self.processed[1] + self.processed[2] + self.processed[3] +76                   self.processed[4] + self.processed[5]) == 9977        assert all([len(self.processed[key]) > 10 for key in self.processed])78    def test_start_and_error(self):79        pool = worker.WorkerPool(ErrorWorker, 5, {}, sleep=0.1)80        pool.fill(['item' + str(item) for item in range(1, 100)])81        with pytest.raises(worker.WorkerPoolException):82            pool.start()83class TestWorker(object):84    def setup_method(self, func_method):85        test_stop_event = threading.Event()86        error_stop_event = threading.Event()87        test_counter = worker.SafeCounter()88        error_counter = worker.SafeCounter()89        test_worker_queue = queue.Queue()90        error_worker_queue = queue.Queue()91        for item in range(1, 100):92            test_worker_queue.put(item)93            error_worker_queue.put(item)94        self.processed = {}95        self.test_worker = FalseWorker(1, test_worker_queue,96                                       test_counter, test_stop_event,97                                       self.processed)98        self.error_worker = ErrorWorker(2, error_worker_queue,99                                        error_counter, error_stop_event,100                                        self.processed)101    def test_run(self):102        self.test_worker.run()103        assert len(self.processed[1]) == 99104        assert hasattr(self.test_worker, 'on_end_called')105        assert self.test_worker.queue.empty()106        assert not self.test_worker.stop_event.isSet()107        self.error_worker.run()108        assert len(self.processed[2]) == 79109        assert hasattr(self.error_worker, 'on_end_called')110        assert self.error_worker.queue.qsize() == 19111        assert self.error_worker.stop_event.isSet()112    def test__get_queue(self):113        self.test_worker._get_queue()114        assert len(self.processed[1]) == 1115        assert not hasattr(self.test_worker, 'on_end_called')116        assert self.test_worker.queue.qsize() == 98117        assert not self.test_worker.stop_event.isSet()118        for i in range(1, 80):119            self.error_worker._get_queue()120        assert len(self.processed[2]) == 79121        assert not hasattr(self.error_worker, 'on_end_called')122        assert self.error_worker.queue.qsize() == 20123        assert not self.error_worker.stop_event.isSet()124        # 80th event125        self.error_worker._get_queue()126        assert len(self.processed[2]) == 79127        assert not hasattr(self.error_worker, 'on_end_called')128        assert self.error_worker.queue.qsize() == 19129        assert self.error_worker.stop_event.isSet()130class TestDownloadWorker(object):131    def setup_method(self, func_method):132        stop_event = threading.Event()133        counter = worker.SafeCounter()134        worker_queue = queue.Queue()135        folder = None136        self.download_worker = worker.DownloadWorker(1, worker_queue, counter,137                                                     stop_event, folder)138    def test__secured_download_file_connection_error(self, monkeypatch):139        def raise_url_error(url, filename, md5sum=None):140            raise URLError('message')141        monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)142        monkeypatch.setattr(logging, 'error', lambda x: x)143        with pytest.raises(URLError):144            self.download_worker._secured_download_file('url', 'filename')145    def test__secured_download_file_wrong_url(self, monkeypatch):146        def raise_url_error(url, filename, md5sum=None):147            raise HTTPError('message', None, None, None, None)148        monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)149        monkeypatch.setattr(logging, 'error', lambda x: x)150        with pytest.raises(HTTPError):151            self.download_worker._secured_download_file('url', 'filename')152    def test__secured_download_file_wrong_checksum(self, monkeypatch):153        def raise_url_error(url, filename, md5sum=None):154            raise worker.InvalidCheckSumException('checksum exception')155        monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)156        monkeypatch.setattr(logging, 'error', lambda x: x)157        with pytest.raises(worker.InvalidCheckSumException) as e:158            self.download_worker._secured_download_file('url', 'filename', 'checkcsum')159        assert str(e.value) == 'checksum exception'160    def test__secured_download_file_wrong_zip_crc(self, monkeypatch):161        def raise_zip_error(url, filename, md5sum=None):162            raise zipfile.BadZipfile('bad crc')163        monkeypatch.setattr(self.download_worker, '_download_file', raise_zip_error)164        monkeypatch.setattr(logging, 'error', lambda x: x)165        with pytest.raises(zipfile.BadZipfile) as e:166            self.download_worker._secured_download_file('url', 'filename', 'checkcsum')167        assert str(e.value) == 'bad crc'168    def test__download_file(self, tmpdir):169        tmp_folder = str(tmpdir.mkdir('gmaltcli'))170        self.download_worker.folder = tmp_folder171        self.download_worker._download_file(172            'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',173            'N00E010.hgt.zip')174        downloaded_path = os.path.join(self.download_worker.folder, 'N00E010.hgt.zip')175        assert os.path.exists(downloaded_path)176        assert os.path.getsize(downloaded_path) == 1743694177    def test__download_file_with_md5sum_check(self, tmpdir):178        tmp_folder = str(tmpdir.mkdir('gmaltcli'))179        self.download_worker.folder = tmp_folder180        self.download_worker._download_file(181            'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',182            'N00E010.hgt.zip',183            'dfb52a9b9eae6de945bd2cfbbacdbc7f')184        downloaded_path = os.path.join(self.download_worker.folder, 'N00E010.hgt.zip')185        assert os.path.exists(downloaded_path)186        assert os.path.getsize(downloaded_path) == 1743694187    def test__download_file_with_wrong_md5sum_check(self, tmpdir):188        tmp_folder = str(tmpdir.mkdir('gmaltcli'))189        self.download_worker.folder = tmp_folder190        with pytest.raises(worker.InvalidCheckSumException):191            self.download_worker._download_file(192                'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',193                'N00E010.hgt.zip',194                'abcdefgh')195    def test__download_file_with_wrong_crc_check(self, tmpdir, monkeypatch):196        monkeypatch.setattr(zipfile.ZipFile, 'testzip', lambda x: 'bad crc filename')197        tmp_folder = str(tmpdir.mkdir('gmaltcli'))198        self.download_worker.folder = tmp_folder199        with pytest.raises(zipfile.BadZipfile):200            self.download_worker._download_file(201                'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',202                'N00E010.hgt.zip',203                'dfb52a9b9eae6de945bd2cfbbacdbc7f')204class TestExtractWorker(object):205    def setup_method(self, func_method):206        stop_event = threading.Event()207        counter = worker.SafeCounter()208        worker_queue = queue.Queue()209        folder = None210        self.extract_worker = worker.ExtractWorker(1, worker_queue, counter,211                                                   stop_event, folder)212    def test__extract_file(self, tmpdir):213        tmp_folder = str(tmpdir.mkdir('gmaltcli'))214        self.extract_worker.folder = tmp_folder215        zip_file = os.path.realpath(os.path.join(os.path.dirname(__file__), 'srtm3', 'N00E010.hgt.zip'))216        self.extract_worker._extract_file(zip_file)217        extracted_file = os.path.join(tmp_folder, 'N00E010.hgt')218        assert os.path.exists(extracted_file)...stop_test.py
Source:stop_test.py  
...10        """Setup test case"""11        self.sio = socketio.Client()12        self.mock = Mock()13        self.sio.connect('http://localhost:8080')14    def test_stop_event(self):15        """Test stopping movement with event"""16        self.sio.emit('register_manipulator', 1)17        self.sio.emit('register_manipulator', 2)18        self.sio.emit('bypass_calibration', 1)19        self.sio.emit('bypass_calibration', 2)20        self.sio.emit('goto_pos', {'manipulator_id': 1, 'pos': [0, 0, 0, 0],21                                   'speed': self.DRIVE_SPEED},22                      callback=self.mock)23        self.sio.emit('goto_pos', {'manipulator_id': 2, 'pos': [0, 0, 0, 0],24                                   'speed': self.DRIVE_SPEED},25                      callback=self.mock)26        self.sio.emit('goto_pos',27                      {'manipulator_id': 1,28                       'pos': [10000, 10000, 10000, 10000],...test_node.py
Source:test_node.py  
1import pytest2from valens.structures.node import Node3from torch.multiprocessing import Event4import time5def test_stop_event():6    class TestNode(Node):7        def process(self):8            pass9    t = [TestNode("t1"), TestNode("t2")]10    stop_event = Event()11    for ti in t: ti.set_stop_event(stop_event)12    for ti in t: ti.start()13    time.sleep(10)14    stop_event.set()15    for ti in t: ti.join()16def test_terminate():17    class TestNode(Node):18        def process(self):19            pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
