Best Python code snippet using stestr_python
resources_test.py
Source:resources_test.py  
1import unittest2import falcon3import falcon.testing as testing4from mock import MagicMock, patch5from meniscus.api.status.resources import WorkerStatusResource6from meniscus.api.status.resources import WorkersStatusResource7from meniscus.data.model.worker import Worker8from meniscus.data.model.worker import SystemInfo9from meniscus.openstack.common import jsonutils10def suite():11    suite = unittest.TestSuite()12    suite.addTest(WhenTestingWorkerOnPut())13    suite.addTest(WhenTestingWorkerStatus())14    return suite15class WhenTestingWorkerOnPut(testing.TestBase):16    def before(self):17        self.status = 'online'18        self.hostname = 'worker01'19        self.personality = 'worker'20        self.ip4 = "192.168.100.101",21        self.ip6 = "::1",22        self.system_info = SystemInfo().format()23        self.bad_status = 'bad_status'24        self.bad_system_info = SystemInfo()25        self.bad_worker_status = {26            'worker_status': {27                'hostname': self.hostname,28                'system_info': self.system_info,29                'status': self.bad_status,30                'personality': 'worker',31                'ip_address_v4': '192.168.100.101',32                'ip_address_v6': '::1'33            }34        }35        self.worker = {36            'worker_status': {37                'hostname': self.hostname,38                'system_info': self.system_info,39                'status': self.status,40                'personality': 'worker',41                'ip_address_v4': '192.168.100.101',42                'ip_address_v6': '::1'43            }44        }45        self.returned_worker = Worker(**{"hostname": "worker01",46                                         "ip_address_v4": "192.168.100.101",47                                         "ip_address_v6": "::1",48                                         "personality": "worker",49                                         "status": "online",50                                         "system_info": self.system_info})51        self.req = MagicMock()52        self.resp = MagicMock()53        self.resource = WorkerStatusResource()54        self.test_route = '/worker/{hostname}/status'55        self.api.add_route(self.test_route, self.resource)56    def test_returns_400_body_validation(self):57        with patch('meniscus.data.model.worker_util.save_worker', MagicMock()):58            self.simulate_request(59                self.test_route,60                method='PUT',61                headers={'content-type': 'application/json'},62                body=jsonutils.dumps(self.bad_worker_status))63            self.assertEqual(falcon.HTTP_400, self.srmock.status)64    def test_return_202_for_new_worker_when_worker_not_found(self):65        create_worker = MagicMock()66        with patch('meniscus.data.model.worker_util.find_worker',67                   MagicMock(return_value=None)), \68                patch('meniscus.data.model.worker_util.create_worker',69                      create_worker):70            self.simulate_request(71                self.test_route,72                method='PUT',73                headers={'content-type': 'application/json'},74                body=jsonutils.dumps(self.worker))75            # self.assertEqual(falcon.HTTP_202, self.srmock.status)76    def test_return_200_for_new_worker_when_worker_found(self):77        save_worker = MagicMock()78        with patch('meniscus.data.model.worker_util.find_worker',79                   MagicMock(return_value=self.returned_worker)), \80                patch('meniscus.data.model.worker_util.save_worker',81                      save_worker):82            self.simulate_request(83                self.test_route,84                method='PUT',85                headers={'content-type': 'application/json'},86                body=jsonutils.dumps(self.worker))87            # self.assertEqual(falcon.HTTP_200, self.srmock.status)88    def test_returns_400_bad_worker_status(self):89        with patch('meniscus.data.model.worker_util.find_worker',90                   MagicMock(return_value=self.worker)):91            self.simulate_request(92                self.test_route,93                method='PUT',94                headers={'content-type': 'application/json'},95                body=jsonutils.dumps(self.bad_worker_status))96            self.assertEqual(falcon.HTTP_400, self.srmock.status)97    def test_when_load_average_is_negative_then_should_return_http_400(self):98        with patch('meniscus.data.model.worker_util.find_worker',99                   MagicMock(return_value=self.worker)):100            system_info = SystemInfo()101            system_info.load_average = {"1": -2, "15": -2, "5": -2}102            self.simulate_request(103                self.test_route,104                method='PUT',105                headers={'content-type': 'application/json'},106                body=jsonutils.dumps({107                    'worker_status': {108                        'hostname': self.hostname,109                        'system_info': system_info.format(),110                        'status': self.status111                    }112                }))113            self.assertEqual(falcon.HTTP_400, self.srmock.status)114class WhenTestingWorkersStatus(unittest.TestCase):115    def setUp(self):116        self.req = MagicMock()117        self.resp = MagicMock()118        self.hostname = 'worker01'119        self.resource = WorkersStatusResource()120        self.worker = Worker(_id='010101',121                             hostname=self.hostname,122                             ip_address_v4='172.23.1.100',123                             ip_address_v6='::1',124                             personality='worker01',125                             status='online',126                             system_info={})127    def test_returns_200_on_get(self):128        with patch('meniscus.data.model.worker_util.retrieve_all_workers',129                   MagicMock(return_value=[self.worker])):130            self.resource.on_get(self.req, self.resp)131            self.assertEquals(self.resp.status, falcon.HTTP_200)132            resp = jsonutils.loads(self.resp.body)133            status = resp['status'][0]134        for key in resp.keys():135            self.assertTrue(key in self.worker.get_status().keys())136class WhenTestingWorkerStatus(unittest.TestCase):137    def setUp(self):138        self.req = MagicMock()139        self.resp = MagicMock()140        self.hostname = 'worker01'141        self.resource = WorkerStatusResource()142        self.worker = Worker(_id='010101',143                             hostname=self.hostname,144                             ip_address_v4='172.23.1.100',145                             ip_address_v6='::1',146                             personality='worker01',147                             status='online',148                             system_info={})149        self.hostname = 'worker01'150        self.worker_not_found = None151    def test_raises_worker_not_found(self):152        with patch('meniscus.data.model.worker_util.find_worker',153                   MagicMock(return_value=None)):154            with self.assertRaises(falcon.HTTPError):155                self.resource.on_get(self.req, self.resp, self.hostname)156    def test_returns_200_on_get(self):157        with patch('meniscus.data.model.worker_util.find_worker',158                   MagicMock(return_value=self.worker)):159            self.resource.on_get(self.req, self.resp, self.hostname)160            self.assertEquals(self.resp.status, falcon.HTTP_200)161            resp = jsonutils.loads(self.resp.body)162            status = resp['status']163            for key in resp.keys():164                self.assertTrue(key in self.worker.get_status().keys())165if __name__ == '__main__':...test_find_worker_view.py
Source:test_find_worker_view.py  
1import json2import random3import pytest4from halp_backend import user_converter5from halp_backend.models import Request6from halp_backend.tests.conftest import encode_auth, fake_request_creation, generate_users7pytestmark = pytest.mark.django_db8def test_found(create_full_user, client):9    Request.objects.paired_requests = {}10    Request.objects.pending_requests.clear()11    user, password = create_full_user12    worker, _ = random.choice(list(generate_users(3)))13    request_dict = fake_request_creation(False)14    request_dict['customer'] = user15    request = Request.objects.create_request(**request_dict)16    assert Request.objects.find_job(worker)17    resp = client.get(18        f'/api/v1/find_worker?request_id={request.id}',19        HTTP_AUTHORIZATION=encode_auth(user.email, password)20    )21    assert resp.status_code == 20022    resp_json = json.loads(resp.content)23    assert resp_json['worker'] == user_converter.to_dict(worker)24def test_not_found(create_full_user, client):25    Request.objects.paired_requests = {}26    Request.objects.pending_requests.clear()27    user, password = create_full_user28    request_dict = fake_request_creation(False)29    request_dict['customer'] = user30    request = Request.objects.create_request(**request_dict)31    resp = client.get(32        f'/api/v1/find_worker?request_id={request.id}',33        HTTP_AUTHORIZATION=encode_auth(user.email, password)34    )35    assert resp.status_code == 20036    resp_json = json.loads(resp.content)37    assert resp_json['worker'] is None38def test_fail_other_customer(create_full_user, client):39    Request.objects.paired_requests = {}40    Request.objects.pending_requests.clear()41    user, password = create_full_user42    request_dict = fake_request_creation(False)43    request = Request.objects.create_request(**request_dict)44    resp = client.get(45        f'/api/v1/find_worker?request_id={request.id}',46        HTTP_AUTHORIZATION=encode_auth(user.email, password)47    )48    assert resp.status_code == 40349def test_fail_not_exist(create_full_user, client):50    Request.objects.paired_requests = {}51    Request.objects.pending_requests.clear()52    user, password = create_full_user53    request_dict = fake_request_creation(False)54    request = Request.objects.create_request(**request_dict)55    resp = client.get(56        f'/api/v1/find_worker?request_id={request.id + 1}',57        HTTP_AUTHORIZATION=encode_auth(user.email, password)58    )59    assert resp.status_code == 40460def test_bad_id(create_full_user, client):61    Request.objects.paired_requests = {}62    Request.objects.pending_requests.clear()63    user, password = create_full_user64    worker, _ = random.choice(list(generate_users(3)))65    request_dict = fake_request_creation(False)66    request_dict['customer'] = user67    request = Request.objects.create_request(**request_dict)68    assert Request.objects.find_job(worker)69    resp = client.get(70        f'/api/v1/find_worker?request_id={request.id}.1',71        HTTP_AUTHORIZATION=encode_auth(user.email, password)72    )...test_find.py
Source:test_find.py  
1# 2# Copyright 2010 University of Southern California3# 4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7# 8#    http://www.apache.org/licenses/LICENSE-2.09# 10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15#16"""17Unit tests for find module.18"""19from tagfiler.iobox import worker, find20import base21import unittest22import logging23import time24logger = logging.getLogger(__name__)25def all_tests():26    """Returns a TestSuite that includes all test cases in this module."""27    suite = unittest.TestSuite()28    suite.addTest(FindTest())29    return suite30class FindTest(base.OutboxBaseTestCase):31    32    def get_numroots(self):33        return 234    35    def get_numdirs(self):36        return 537    38    def get_numfiles(self):39        return 1040    def runTest(self):41        """Simple test for the Find worker."""42        43        find_q = worker.WorkQueue()44        tag_q = worker.WorkQueue()45    46        for rootdir in self.rootdirs:47            find_q.put(rootdir)48        49        find_worker = find.Find(find_q, tag_q)50        find_worker.start()51        find_q.join()52        find_worker.terminate()53        54        self.assertEqual(tag_q.qsize(), 55            (self.get_numroots() + 56             self.get_numroots() * self.get_numdirs() + 57             self.get_numroots() * self.get_numdirs() * self.get_numfiles()), 58            "Failed to find all directories and files in the temp dir")59        60        time.sleep(1) #TODO(schuler): Hate to do it this way61        self.assertFalse(find_worker.is_alive(), "Find has not terminated")62if __name__ == "__main__":63    #import sys;sys.argv = ['', 'Test.testName']64    logging.basicConfig(level=logging.DEBUG)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
