How to use pause_execution method in Robotframework

Best Python code snippet using robotframework

test_runnerpause.py

Source:test_runnerpause.py Github

copy

Full Screen

...125 # In test case, we are running126 self.assertTrue(self.tr.is_running)127 self.assertFalse(self.tr.is_paused)128 self.assertFalse(self.tr.is_stopped)129 self.tr.pause_execution()130 # Pausing the execution is not enough, test case still running131 self.assertTrue(self.tr.is_running)132 self.assertFalse(self.tr.is_paused)133 self.assertFalse(self.tr.is_stopped)134 # We have to wait for current test case to finish before is_paused.135 self.assertTrue(self.tr.is_paused)136 self.assertFalse(self.tr.is_running)137 self.assertFalse(self.tr.is_stopped)138 self.assertEqual(len(self.tr.run_history), 1)139 finally:140 self.tr.resume_execution()141 # second test case142 with self.during_testcase():143 pass144 self.run_finish()145 self.assertFalse(self.tr.is_paused)146 self.assertFalse(self.tr.is_running)147 self.assertTrue(self.tr.is_stopped)148 self.assertEqual(len(self.run_queue), 0)149 self.assertEqual(len(self.tr.run_history), 2)150 def test_pause_and_resume_execution_can_be_called_multiple_times_in_a_row(self):151 """152 Pause and resume execution can be called multiple times in row without locking.153 Additional calls to pause_execution has no effect if already paused.154 A single call to resume execution is enough for resuming, despite155 multiple pause.156 """157 self.add_testcase()158 self.add_testcase()159 try:160 self.run_start()161 with self.during_testcase():162 # Multiple pause in a row is allowed163 self.tr.pause_execution()164 self.tr.pause_execution()165 self.assertTrue(self.tr.is_paused)166 finally:167 self.tr.resume_execution()168 # First resume is enough to start execution again169 self.assertFalse(self.tr.is_paused)170 # Multiple resume is allowed171 self.tr.resume_execution()172 # second test case173 with self.during_testcase():174 pass175 self.run_finish()176 self.assertTrue(self.tr.is_stopped)177 def test_pause_and_resume_execution_can_be_called_multiple_cicles(self):178 """179 Supports multiple cicles of pause and resume execution.180 Pause and resume can be done again after first cicle of pause-resume.181 """182 self.add_testcase()183 self.add_testcase()184 self.add_testcase()185 try:186 self.run_start()187 with self.during_testcase():188 self.tr.pause_execution()189 self.assertTrue(self.tr.is_paused)190 self.tr.resume_execution()191 with self.during_testcase():192 self.tr.pause_execution()193 self.assertTrue(self.tr.is_paused)194 finally:195 self.tr.resume_execution()196 self.assertFalse(self.tr.is_paused)197 # third test case198 with self.during_testcase():199 pass200 self.run_finish()201 self.assertTrue(self.tr.is_stopped)202 def test_raises_exception_if_wait_for_resume_is_too_long(self):203 """204 Raises exception if wait for resume is too long.205 The runner raises an ExecutionPausedTooLong if the execution206 is paused and the command loop is stalled for more than,207 EXECUTION_PAUSED_TIMEOUT_SECONDS.208 """209 self.add_testcase()210 self.run_start()211 self.tr.EXECUTION_PAUSED_TIMEOUT_SECONDS = 0212 with self.during_testcase():213 self.tr.pause_execution()214 with self.assertRaises(ExecutionPausedTooLong):215 self.run_finish()216 def test_abort_during_testcase_is_delayed_until_execution_resumes(self):217 """218 Abort, during test case in progress, is delayed until execution resumes.219 If aborted (non-critical), while being paused, the execution is not220 automatically resumed. Instead the abort is delayed until the runner221 resumes.222 """223 self.add_testcase()224 self.add_testcase()225 self.run_start()226 with self.during_testcase():227 self.tr.pause_execution()228 self.tr.abort_run_after_current_test_cases_have_completed()229 # We are still paused even though abort requested.230 self.assertTrue(self.tr.is_paused)231 self.tr.resume_execution()232 self.message_queue.get(timeout=self.TIMEOUT) # skipped test case233 self.run_finish()234 self.assertEqual(len(self.run_queue), 0)235 self.assertEqual(len(self.tr.run_history), 2)236 self.assertEqual(self.tr.run_history[0].verdict, Verdict.PASSED)237 self.assertEqual(self.tr.run_history[1].verdict, Verdict.SKIPPED)238 def test_abort_inbetween_testcase_is_delayed_until_execution_resumes(self):239 """240 Abort, inbetween test case in progress, is delayed until execution resumes.241 If aborted (non-critical), while being paused, the execution is not242 automatically resumed. Instead the abort is delayed until the runner243 resumes.244 """245 self.add_testcase()246 self.add_testcase()247 self.run_start()248 with self.during_testcase():249 self.tr.pause_execution()250 self.tr.abort_run_after_current_test_cases_have_completed()251 # We are still paused even though abort requested.252 self.assertTrue(self.tr.is_paused)253 self.tr.resume_execution()254 self.message_queue.get(timeout=self.TIMEOUT) # skipped test case255 self.run_finish()256 self.assertEqual(len(self.run_queue), 0)257 self.assertEqual(len(self.tr.run_history), 2)258 self.assertEqual(self.tr.run_history[0].verdict, Verdict.PASSED)259 self.assertEqual(self.tr.run_history[1].verdict, Verdict.SKIPPED)260 def test_immediate_abort_during_testcase_resumes_execution(self):261 """262 Immediate abort aborts right away by resuming execution.263 If immediate aborted (critical), test case in progress, while being paused,264 the execution is automatically resumed. The abort is handled directly265 and the runner is stopped immediately.266 """267 self.add_testcase()268 self.add_testcase()269 self.run_start()270 with self.during_testcase():271 self.tr.pause_execution()272 self.tr.abort_run_immediately()273 # We are not paused any more after immediate abort.274 self.assertFalse(self.tr.is_paused)275 self.message_queue.get(timeout=self.TIMEOUT) # skipped test case276 self.run_finish()277 self.assertEqual(len(self.run_queue), 0)278 self.assertEqual(len(self.tr.run_history), 2)279 # Race condition if abort will actually have time to raise the abort280 # exception, and we therefore don't know the verdict of the first281 # test case, it can be either pass or error. Hence no check for first282 # test case.283 self.assertEqual(self.tr.run_history[1].verdict, Verdict.SKIPPED)284 def test_immediate_abort_inbetween_testcase_resumes_execution(self):285 """286 Immediate abort aborts right away by resuming execution.287 If immediate aborted (critical), inbetween test cases, while being paused,288 the execution is automatically resumed. The abort is handled directly289 and the runner is stopped immediately.290 """291 self.add_testcase()292 self.add_testcase()293 self.run_start()294 with self.during_testcase():295 self.tr.pause_execution()296 self.assertTrue(self.tr.is_paused)297 # We are not paused any more after immediate abort.298 self.tr.abort_run_immediately()299 self.assertFalse(self.tr.is_paused)300 self.message_queue.get(timeout=self.TIMEOUT) # skipped test case301 self.run_finish()302 self.assertEqual(len(self.run_queue), 0)303 self.assertEqual(len(self.tr.run_history), 2)...

Full Screen

Full Screen

test_manager.py

Source:test_manager.py Github

copy

Full Screen

1from unittest import TestCase2from unittest.mock import MagicMock3from zaf.builtin.unittest.harness import ExtensionTestHarness, SyncMock4from zaf.messages.dispatchers import CallbackDispatcher, LocalMessageQueue5from k2 import ABORT, CRITICAL_ABORT6from k2.cmd.run import RUN_COMMAND_ENDPOINT, TEST_RUN7from k2.runner import ABORT_TEST_CASE_REQUEST, RUNNER_ENDPOINT, TEST_RUN_FINISHED, TEST_RUN_STARTED8from k2.scheduler import SCHEDULE_NEXT_TEST, SCHEDULER_ENDPOINT9from k2.sut import SUT_RESET_DONE, SUT_RESET_STARTED10from ..manager import TestRunnerManager11MOCK_ENDPOINT = MagicMock()12TIMEOUT = 513class TestTestRunnerExtension(TestCase):14 @staticmethod15 def create_harness():16 harness = ExtensionTestHarness(17 TestRunnerManager,18 endpoints_and_messages={19 MOCK_ENDPOINT: [SUT_RESET_STARTED, SUT_RESET_DONE],20 RUN_COMMAND_ENDPOINT: [ABORT, CRITICAL_ABORT, TEST_RUN],21 SCHEDULER_ENDPOINT: [SCHEDULE_NEXT_TEST],22 })23 harness.schedule_test_case = None24 def schedule_test_case(message):25 harness.schedule_test_case = message26 return None27 scheduler_dispatcher = CallbackDispatcher(harness.messagebus, schedule_test_case)28 scheduler_dispatcher.register([SCHEDULE_NEXT_TEST], [SCHEDULER_ENDPOINT])29 return harness30 def test_trigger_test_run_started_and_finished_on_test_run(self):31 """Trigger events TEST_RUN_STARTED and TEST_RUN_FINISHED, on message TEST_RUN."""32 with self.create_harness() as harness:33 events = [TEST_RUN_STARTED, TEST_RUN_FINISHED]34 with LocalMessageQueue(harness.messagebus, events) as queue:35 harness.messagebus.trigger_event(TEST_RUN, RUN_COMMAND_ENDPOINT, data=MagicMock())36 self.assertEqual(queue.get(timeout=TIMEOUT).message_id, TEST_RUN_STARTED)37 self.assertEqual(queue.get(timeout=TIMEOUT).message_id, TEST_RUN_FINISHED)38 def test_abort_after_current_test_case_on_abort(self):39 """Abort after the current test case, on message ABORT."""40 with self.create_harness() as harness:41 runner_mock = SyncMock()42 runner_mock.abort_run_after_current_test_cases_have_completed = SyncMock()43 harness.extension._runner = runner_mock44 harness.trigger_event(ABORT, RUN_COMMAND_ENDPOINT)45 runner_mock.abort_run_after_current_test_cases_have_completed.wait_for_call(46 timeout=TIMEOUT)47 def test_abort_immediately_on_critical_abort(self):48 """Abort immediately, on message CRITICAL_ABORT."""49 with self.create_harness() as harness:50 runner_mock = SyncMock()51 runner_mock.abort_run_immediately = SyncMock()52 harness.extension._runner = runner_mock53 harness.trigger_event(CRITICAL_ABORT, RUN_COMMAND_ENDPOINT)54 runner_mock.abort_run_immediately.wait_for_call(timeout=TIMEOUT)55 def test_request_abort_test_case(self):56 with self.create_harness() as harness:57 runner_mock = MagicMock()58 harness.extension._runner = runner_mock59 harness.send_request(60 ABORT_TEST_CASE_REQUEST, RUNNER_ENDPOINT, data=MagicMock()).wait()[0].result()61 self.assertEqual(runner_mock.abort_test_case.call_count, 1)62 def test_pausing_execution_on_sut_reset_started(self):63 """Pause execution, on message SUT_RESET_STARTED."""64 with self.create_harness() as harness:65 runner_mock = SyncMock()66 runner_mock.pause_execution = SyncMock()67 entity = MagicMock()68 harness.extension._runner = runner_mock69 harness.trigger_event(SUT_RESET_STARTED, MOCK_ENDPOINT, entity=entity, data=False)70 runner_mock.pause_execution.wait_for_call(timeout=TIMEOUT)71 def test_resume_execution_on_sut_reset_done(self):72 """Resume execution, on message SUT_RESET_DONE."""73 with self.create_harness() as harness:74 runner_mock = SyncMock()75 runner_mock.resume_execution = SyncMock()76 entity = MagicMock()77 harness.extension._runner = runner_mock78 harness.extension._resetting_entities.add(entity)79 harness.trigger_event(SUT_RESET_DONE, MOCK_ENDPOINT, entity=entity, data=False)80 runner_mock.resume_execution.wait_for_call(timeout=TIMEOUT)81 def test_multiple_sut_reset_started_before_single_reset_done(self):82 """Receiving multiple SUT_RESET_STARTED before a single SUT_RESET_DONE is okay."""83 with self.create_harness() as harness:84 runner_mock = SyncMock()85 runner_mock.resume_execution = SyncMock()86 runner_mock.pause_execution = SyncMock()87 entity = MagicMock()88 harness.extension._runner = runner_mock89 harness.trigger_event(SUT_RESET_STARTED, MOCK_ENDPOINT, entity=entity, data=False)90 harness.trigger_event(SUT_RESET_STARTED, MOCK_ENDPOINT, entity=entity, data=False)91 harness.trigger_event(SUT_RESET_DONE, MOCK_ENDPOINT, entity=entity, data=False)92 runner_mock.resume_execution.wait_for_call(timeout=TIMEOUT)93 self.assertEqual(runner_mock.pause_execution.call_count, 2)94 def test_not_resuming_if_other_sut_is_being_reset(self):95 """Execution is not resumed if another SUT is being reset."""96 with self.create_harness() as harness:97 runner_mock = SyncMock()98 runner_mock.pause_execution = SyncMock()99 runner_mock.resume_execution = SyncMock()100 entity1 = MagicMock()101 entity2 = MagicMock()102 harness.extension._runner = runner_mock103 harness.trigger_event(SUT_RESET_STARTED, MOCK_ENDPOINT, entity=entity1, data=False)104 harness.trigger_event(SUT_RESET_STARTED, MOCK_ENDPOINT, entity=entity2, data=False)105 harness.trigger_event(SUT_RESET_DONE, MOCK_ENDPOINT, entity=entity1, data=False)106 runner_mock.pause_execution.wait_for_call(timeout=TIMEOUT)107 runner_mock.pause_execution.wait_for_call(timeout=TIMEOUT)108 self.assertEqual(runner_mock.resume_execution.call_count, 0)109 harness.trigger_event(SUT_RESET_DONE, MOCK_ENDPOINT, entity=entity2, data=False)110 runner_mock.resume_execution.wait_for_call(timeout=TIMEOUT)111 def test_reset_done_without_reset_started_is_allowed(self):112 """Receiving SUT_RESET_DONE without prior SUT_RESET_STARTED is okay."""113 with self.create_harness() as harness:114 runner_mock = SyncMock()115 runner_mock.resume_execution = SyncMock()116 entity = MagicMock()117 harness.extension._runner = runner_mock118 harness.trigger_event(SUT_RESET_DONE, MOCK_ENDPOINT, entity=entity, data=False)...

Full Screen

Full Screen

function_exec_manager.py

Source:function_exec_manager.py Github

copy

Full Screen

...115 self.late_functions = []116 # sleep to reduce computational load117 time.sleep(0.001)118 # check if user wants to pause execution119 while self.pause_execution():120 time.sleep(0.1)121 # execute custom function after having finished the loop122 self.exec_after_each_loop()123 # wait for threads to finish124 for thread in self.late_threads:...

Full Screen

Full Screen

pause.py

Source:pause.py Github

copy

Full Screen

...14 print("\nResuming execution...\n")15 t.log("Resuming execution...\n")16 del signal_number #unused hence deleting to improve pylint score17 del frame #unused hence deleting to improve pylint score18def pause_execution():19 '''20 pause_execution21 '''22 console.write('\nToby process [ %s ] sending signal to group PID [ %s ] ' % (str(os.getpid()), os.getpgid(os.getpid())))23 t.log('Toby process [ %s ] sending signal to parent process [ %s ]\n' % (str(os.getpid()), os.getpgid(os.getpid())))24 os.kill(os.getpgid(os.getpid()), signal.SIGCONT)25 signal.signal(signal.SIGINT, resume_execution)26 print("\nPress Ctrl+C to resume...")27 signal.pause()28#########################################################29#30# Pause Listener31#32#########################################################33class pause(object):34 '''35 Pause listener36 This listener is attached when --pause option is enabled from toby wrapper script with required YAML input file.37 Based on input YAML file, it will Pause the excecution and wait for SIGCONT to resume execution.38 '''39 ROBOT_LISTENER_API_VERSION = 240 def __init__(self):41 self.pause_file_content = None42 console.write('Running with Pause listener attached...\n')43 def load_pause_yaml(self):44 '''45 load_pause_yaml46 '''47 if self.pause_file_content is None:48 val = BuiltIn().get_variable_value('${pause_file}')49 if val:50 try:51 self.pause_file_content = yaml.safe_load(open(val))52 except Exception as error:53 raise error54 def check_for_pause(self, where, when, name):55 '''56 check_for_pause57 '''58 try:59 input_list = self.pause_file_content[where][when]60 input_list = [element.lower() for element in input_list]61 if name.lower() in input_list or 'any' in input_list:62 return True63 except Exception:64 pass65 return False66 def end_keyword(self, name, attrs):67 """68 This listener method is called when keyword ends69 :param name:70 **REQUIRED** The shortname of test suite71 :param attrs:72 **REQUIRED** The attribute dictionary containing type, kwname, librname,73 starttime, endtime, elapsedtime and status.74 :return:75 None76 """77 self.load_pause_yaml()78 if self.check_for_pause('keywords', 'after', name) or (attrs['status'] == "FAIL" and self.check_for_pause('keywords', 'fail', name)):79 print("\nPaused after Keyword: " + name)80 t.log("Paused after Keyword: " + name)81 pause_execution()82 def start_keyword(self, name, attrs):83 """84 This listener method is called when keyword starts85 :param name:86 **REQUIRED** The shortname of test suite87 :param attrs:88 **REQUIRED** The attribute dictionary containing type, kwname, librname,89 starttime, endtime, elapsedtime and status.90 :return:91 None92 """93 self.load_pause_yaml()94 if self.check_for_pause('keywords', 'before', name):95 print("\nPaused before Keyword: " + name)96 t.log("Paused before Keyword: " + name)97 pause_execution()98 del attrs #unused hence deleting to improve pylint score99 def start_test(self, name, attrs):100 """101 This listener method is called when test starts102 :param name:103 **REQUIRED** The shortname of test suite104 :param attrs:105 **REQUIRED** The attribute dictionary containing id, longname, starttime,106 :return:107 None108 """109 self.load_pause_yaml()110 if self.check_for_pause('testcases', 'before', name):111 print("\nPaused before Test-case: " + name)112 t.log("Paused before Test-case: " + name)113 pause_execution()114 del attrs #unused hence deleting to improve pylint score115 def end_test(self, name, attrs):116 """117 This listener method is called when test ends118 :param name:119 **REQUIRED** The shortname of test suite120 :param attrs:121 **REQUIRED** The attribute dictionary containing id, longname, starttime,122 :return:123 None124 """125 self.load_pause_yaml()126 if self.check_for_pause('testcases', 'after', name) or (attrs['status'] == "FAIL" and self.check_for_pause('testcases', 'fail', name)):127 print("\nPaused after Test-case: " + name)128 t.log("Paused after Test-case: " + name)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful