Best Python code snippet using locust
test_ddot_taskrunner.py
Source:test_ddot_taskrunner.py  
...232        finally:233            shutil.rmtree(temp_dir)234    def test_filebasedsubmittedtaskfactory_get_next_task_taskdirnone(self):235        fac = FileBasedSubmittedTaskFactory(None)236        self.assertEqual(fac.get_next_task(), None)237    def test_filebasedsubmittedtaskfactory_get_next_task(self):238        temp_dir = tempfile.mkdtemp()239        try:240            # no submit dir241            fac = FileBasedSubmittedTaskFactory(temp_dir)242            self.assertEqual(fac.get_next_task(), None)243            # empty submit dir244            sdir = os.path.join(temp_dir, ddot_rest_server.SUBMITTED_STATUS)245            os.makedirs(sdir, mode=0o755)246            self.assertEqual(fac.get_next_task(), None)247            # submit dir with file in it248            sdirfile = os.path.join(sdir, 'somefile')249            open(sdirfile, 'a').close()250            self.assertEqual(fac.get_next_task(), None)251            # submit dir with 1 subdir, but that is empty too252            ipsubdir = os.path.join(sdir, '1.2.3.4')253            os.makedirs(ipsubdir, mode=0o755)254            self.assertEqual(fac.get_next_task(), None)255            # submit dir with 1 subdir, with a file in it for some reason256            afile = os.path.join(ipsubdir, 'hithere')257            open(afile, 'a').close()258            self.assertEqual(fac.get_next_task(), None)259            # empty task dir260            taskdir = os.path.join(ipsubdir, 'sometask')261            os.makedirs(taskdir, mode=0o755)262            self.assertEqual(fac.get_next_task(), None)263            # empty json file264            taskjsonfile = os.path.join(taskdir, ddot_rest_server.TASK_JSON)265            open(taskjsonfile, 'a').close()266            self.assertEqual(fac.get_next_task(), None)267            self.assertEqual(fac.get_size_of_problem_list(), 1)268            plist = fac.get_problem_list()269            self.assertEqual(plist[0], taskdir)270            # try invalid json file271            # try with another task this time valid272            fac = FileBasedSubmittedTaskFactory(temp_dir)273            anothertask = os.path.join(sdir, '4.5.6.7', 'goodtask')274            os.makedirs(anothertask, mode=0o755)275            goodjson = os.path.join(anothertask, ddot_rest_server.TASK_JSON)276            with open(goodjson, 'w') as f:277                json.dump({'hi': 'there'}, f)278            res = fac.get_next_task()279            self.assertEqual(res.get_taskdict(), {'hi': 'there'})280            self.assertEqual(fac.get_size_of_problem_list(), 0)281            # try again since we didn't move it282            res = fac.get_next_task()283            self.assertEqual(res.get_taskdict(), {'hi': 'there'})284            self.assertEqual(fac.get_size_of_problem_list(), 0)285        finally:286            shutil.rmtree(temp_dir)287    def test_nbgwastaskrunner_run_tasks_no_work(self):288        mocktaskfac = MagicMock()289        mocktaskfac.get_next_task = MagicMock(side_effect=[None, None])290        runner = DDotTaskRunner(wait_time=0, taskfactory=mocktaskfac)291        loop = MagicMock()292        loop.side_effect = [True, True, False]293        runner.run_tasks(keep_looping=loop)294        self.assertEqual(loop.call_count, 3)295        self.assertEqual(mocktaskfac.get_next_task.call_count, 2)296    def test_nbgwastaskrunner_run_tasks_task_raises_exception(self):297        temp_dir = tempfile.mkdtemp()298        try:299            mocktaskfac = MagicMock()300            mocktask = MagicMock()301            mocktask.get_taskdir = MagicMock(return_value=temp_dir)302            mocktask.move_task = MagicMock()303            mocktaskfac.get_next_task.side_effect = [None, mocktask]304            mock_net_fac = MagicMock()305            mock_net_fac. \306                get_networkx_object = MagicMock(side_effect=Exception('foo'))307            runner = DDotTaskRunner(wait_time=0,308                                      taskfactory=mocktaskfac)309            loop = MagicMock()310            loop.side_effect = [True, True, False]311            runner.run_tasks(keep_looping=loop)312            self.assertEqual(loop.call_count, 3)313            self.assertEqual(mocktaskfac.get_next_task.call_count, 2)314        finally:315            shutil.rmtree(temp_dir)316    def test_nbgwastaskrunner_remove_deleted_task(self):317        temp_dir = tempfile.mkdtemp()318        try:319            # try where delete task factory is none320            runner = DDotTaskRunner(wait_time=0)321            self.assertEqual(runner._remove_deleted_task(), False)322            # try where no task is returned323            mockfac = MagicMock()324            mockfac.get_next_task = MagicMock(return_value=None)325            runner = DDotTaskRunner(wait_time=0,326                                      deletetaskfactory=mockfac)327            res = runner._remove_deleted_task()328            self.assertEqual(res, False)329            mockfac.get_next_task.assert_called()330            # try where task.get_taskdir() is None331            task = MagicMock()332            task.get_taskdir = MagicMock(return_value=None)333            mockfac.get_next_task = MagicMock(return_value=task)334            runner = DDotTaskRunner(wait_time=0,335                                      deletetaskfactory=mockfac)336            res = runner._remove_deleted_task()337            self.assertEqual(res, True)338            mockfac.get_next_task.assert_called()339            task.get_taskdir.assert_called()340            # try where task.delete_task_files() raises Exception341            task = MagicMock()342            task.get_taskdir = MagicMock(return_value='/foo')343            task.delete_task_files = MagicMock(side_effect=Exception('some '344                                                                     'error'))345            mockfac.get_next_task = MagicMock(return_value=task)346            runner = DDotTaskRunner(wait_time=0,347                                      deletetaskfactory=mockfac)348            res = runner._remove_deleted_task()349            self.assertEqual(res, False)350            mockfac.get_next_task.assert_called()351            task.get_taskdir.assert_called()352            task.delete_task_files.assert_called()353            # try with valid task to delete, but delete returns message354            task = MagicMock()355            task.get_taskdir = MagicMock(return_value='/foo')356            task.delete_task_files = MagicMock(return_value='a error')357            mockfac.get_next_task = MagicMock(return_value=task)358            runner = DDotTaskRunner(wait_time=0,359                                      deletetaskfactory=mockfac)360            res = runner._remove_deleted_task()361            self.assertEqual(res, True)362            mockfac.get_next_task.assert_called()363            task.get_taskdir.assert_called()364            task.delete_task_files.assert_called()365            # try with valid task to delete366            task = MagicMock()367            task.get_taskdir = MagicMock(return_value='/foo')368            task.delete_task_files = MagicMock(return_value=None)369            mockfac.get_next_task = MagicMock(return_value=task)370            runner = DDotTaskRunner(wait_time=0,371                                      deletetaskfactory=mockfac)372            res = runner._remove_deleted_task()373            self.assertEqual(res, True)374            mockfac.get_next_task.assert_called()375            task.get_taskdir.assert_called()376            task.delete_task_files.assert_called()377        finally:378            shutil.rmtree(temp_dir)379    def test_deletefilebasedtaskfactory_get_task_with_id(self):380        temp_dir = tempfile.mkdtemp()381        try:382            # try where taskdir is not set383            tfac = DeletedFileBasedTaskFactory(None)384            res = tfac._get_task_with_id('foo')385            self.assertEqual(res, None)386            # try with valid taskdir387            tfac = DeletedFileBasedTaskFactory(temp_dir)388            res = tfac._get_task_with_id('foo')389            self.assertEqual(res, None)390            # try where we match in submit dir, but match is not391            # a directory392            submitfile = os.path.join(temp_dir, ddot_rest_server.SUBMITTED_STATUS,393                                      '1.2.3.4', 'foo')394            os.makedirs(os.path.dirname(submitfile), mode=0o755)395            open(submitfile, 'a').close()396            tfac = DeletedFileBasedTaskFactory(temp_dir)397            res = tfac._get_task_with_id('foo')398            self.assertEqual(res, None)399            os.unlink(submitfile)400            # try where we match in submit dir, but no json file401            submitdir = os.path.join(temp_dir, ddot_rest_server.SUBMITTED_STATUS,402                                     '1.2.3.4', 'foo')403            os.makedirs(submitdir, mode=0o755)404            tfac = DeletedFileBasedTaskFactory(temp_dir)405            res = tfac._get_task_with_id('foo')406            self.assertEqual(res.get_taskdir(), submitdir)407            # try where we match in submit dir and there is a json file408            taskfile = os.path.join(submitdir,409                                    ddot_rest_server.TASK_JSON)410            with open(taskfile, 'w') as f:411                json.dump({ddot_rest_server.REMOTEIP_PARAM: '1.2.3.4'}, f)412                f.flush()413            tfac = DeletedFileBasedTaskFactory(temp_dir)414            res = tfac._get_task_with_id('foo')415            self.assertEqual(res.get_taskdir(), submitdir)416            self.assertEqual(res.get_ipaddress(), '1.2.3.4')417            # try where loading json file raises exception418            os.unlink(taskfile)419            open(taskfile, 'a').close()420            res = tfac._get_task_with_id('foo')421            self.assertEqual(res.get_taskdir(), submitdir)422            self.assertEqual(res.get_taskdict(), {})423            shutil.rmtree(submitdir)424            # try where we match in processing dir425            procdir = os.path.join(temp_dir, ddot_rest_server.PROCESSING_STATUS,426                                   '4.5.5.5',427                                   '02e487ef-79df-4d99-8f22-1ff1d6d52a2a')428            os.makedirs(procdir, mode=0o755)429            res = tfac._get_task_with_id('02e487ef-79df-4d99-8f22-'430                                         '1ff1d6d52a2a')431            self.assertEqual(res.get_taskdir(), procdir)432            shutil.rmtree(procdir)433            # try where we match in done dir434            donedir = os.path.join(temp_dir, ddot_rest_server.DONE_STATUS,435                                   '192.168.5.5',436                                   '02e487ef-79df-4d99-8f22-1ff1d6d52a2a')437            os.makedirs(donedir, mode=0o755)438            res = tfac._get_task_with_id('02e487ef-79df-4d99-8f22-'439                                         '1ff1d6d52a2a')440            self.assertEqual(res.get_taskdir(), donedir)441        finally:442            shutil.rmtree(temp_dir)443    def test_deletefilebasedtaskfactory_get_next_task(self):444        temp_dir = tempfile.mkdtemp()445        try:446            # test where delete request dir is None447            tfac = DeletedFileBasedTaskFactory(None)448            res = tfac.get_next_task()449            self.assertEqual(res, None)450            # test where delete request dir is not a directory451            tfac = DeletedFileBasedTaskFactory(temp_dir)452            res = tfac.get_next_task()453            self.assertEqual(res, None)454            # no delete requests found455            del_req_dir = os.path.join(temp_dir, ddot_rest_server.DELETE_REQUESTS)456            os.makedirs(del_req_dir, mode=0o755)457            tfac = DeletedFileBasedTaskFactory(temp_dir)458            res = tfac.get_next_task()459            self.assertEqual(res, None)460            # directory in delete requests dir461            dir_in_del_dir = os.path.join(del_req_dir, 'uhohadir')462            os.makedirs(dir_in_del_dir, mode=0o755)463            tfac = DeletedFileBasedTaskFactory(temp_dir)464            res = tfac.get_next_task()465            self.assertEqual(res, None)466            # Found a delete request, but no task found in system467            a_request = os.path.join(del_req_dir,468                                     '02e487ef-79df-4d99-8f22-1ff1d6d52a2a')469            with open(a_request, 'w') as f:470                f.write('1.2.3.4')471                f.flush()472            tfac = DeletedFileBasedTaskFactory(temp_dir)473            res = tfac.get_next_task()474            self.assertEqual(res, None)475            self.assertTrue(not os.path.isfile(a_request))476            # Found a valid request in system477            a_request = os.path.join(del_req_dir,478                                     '02e487ef-79df-4d99-8f22-1ff1d6d52a2a')479            with open(a_request, 'w') as f:480                f.write('1.2.3.4')481                f.flush()482            done_dir = os.path.join(temp_dir, ddot_rest_server.DONE_STATUS,483                                    '1.2.3.4',484                                    '02e487ef-79df-4d99-8f22-1ff1d6d52a2a')485            os.makedirs(done_dir, mode=0o755)486            tfac = DeletedFileBasedTaskFactory(temp_dir)487            res = tfac.get_next_task()488            self.assertEqual(res.get_taskdir(), done_dir)489            self.assertEqual(res.get_taskdict(), {})490            self.assertTrue(not os.path.isfile(a_request))491        finally:492            shutil.rmtree(temp_dir)493    def test_main(self):494        temp_dir = tempfile.mkdtemp()495        try:496            # test no work and disable delete true497            loop = MagicMock()498            loop.side_effect = [True, True, False]499            dt.main(['foo.py', '--wait_time', '0',500                     '--nodaemon',501                     temp_dir],...fabrication_task.py
Source:fabrication_task.py  
...52        while self.tasks_available():53            if stop_thread():54                self.log("FABRICATION: ---FORCED STOP---")55                break56            if self.get_next_task() is not None:57                if self.current_task is None:58                    get_next_task = True59                elif self.parallelize:60                    if (self.current_task.is_running61                            and self.current_task.parallelizable62                            and len(self.running_tasks) < self.max_parallel_tasks):63                        get_next_task = True64                elif not self.parallelize:65                    if (self.current_task.is_completed66                            and not self.current_task.is_running):67                        get_next_task = True68            69            if get_next_task:70                print("Getting new task")71                self.current_task = self.get_next_task()72                get_next_task = False         73            if len(self.running_tasks) > 0:74                for task in self.running_tasks:75                    task.perform(stop_thread, human_close)76                    self.log(task.log_messages)77                    if task.is_completed and not task.is_running:78                        self.running_tasks.remove(task)79            if (not self.current_task.is_completed80                  and not self.current_task.is_running81                  and self.current_task not in self.running_tasks):82                print("Initializing task")83                self.current_task.perform(stop_thread, human_close)84                self.running_tasks.append(self.current_task)85                self.log(self.current_task.log_messages)...test_cluster.py
Source:test_cluster.py  
...20def test_connector_get_next_task_prefetch():21    con = connector.Connector()22    con._base_request_uri = api_url23    con.prefetch = True24    assert con.get_next_task() == task125    assert con._tasks_in_progress == {task1['msg_id']: task1}26    assert con._tasks == [task2, task3]27    assert con.get_next_task() == task228    assert con._tasks_in_progress == {task1['msg_id']: task1, task2['msg_id']: task2}29    assert con._tasks == [task3]30    assert con.get_next_task() == task331    assert con._tasks_in_progress == tasks_in_progress32def test_connector_get_next_task_no_prefetch():33    con = connector.Connector()34    con._base_request_uri = api_url35    con.prefetch = False36    assert con.get_next_task() == task137    assert len(con._tasks) == 038    assert con._tasks_in_progress == {task1['msg_id']: task1}39    assert con.get_next_task() == task240    assert len(con._tasks) == 041    assert con._tasks_in_progress == {task1['msg_id']: task1, task2['msg_id']: task2}42    assert con.get_next_task() == task343    assert len(con._tasks) == 044    assert con._tasks_in_progress == tasks_in_progress45@pytest.mark.parametrize("i", indices)46def test_connector_reply(i):47    con = connector.Connector()48    con._base_request_uri = api_url49    con.get_next_task()50    assert con.reply(replies[i]) == expected_responses[i]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
