Best Python code snippet using localstack_python
test_activity_worker.py
Source:test_activity_worker.py  
1import pytest2import json3from inspect import signature4from unittest.mock import PropertyMock, Mock5import floto.api6@pytest.fixture7def activity_task():8    return {'activityId': 'my_activity_id',9            'activityType': {'name': 'my_activity_type', 'version': 'v1', 'domain': 'd'},10            'startedEventId': 7,11            'taskToken': 'the_task_token',12            'workflowExecution': {'runId': 'the_run_id', 'workflowId': 'the_workflow_id'}}13@pytest.fixture14def worker(mocker):15    mocked_functions = {'respond_activity_task_completed':Mock(),16                        'respond_activity_task_failed':Mock()}17    client_mock = type("ClientMock", (object,), mocked_functions)18    mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock())19    worker = floto.ActivityWorker(domain='d', task_list='tl')20    worker.result = 'result'21    worker.task_token = 'tt'22    return worker23class TestActivityWorker:24    def test_init(self):25        worker = floto.ActivityWorker(domain='d', task_list='tl', task_heartbeat_in_seconds=10)26        assert worker.task_heartbeat_in_seconds == 1027    def test_init_with_defaults(self):28        worker = floto.ActivityWorker(domain='d', task_list='tl')29        assert worker.task_heartbeat_in_seconds == 9030    def test_terminate_activity_worker(self):31        worker = floto.ActivityWorker(domain='d', task_list='tl')32        worker.terminate_worker()33        assert worker._terminate_activity_worker == True34    def test_start_heartbeat(self, mocker):35        mocker.patch('floto.HeartbeatSender.send_heartbeats')36        worker = floto.ActivityWorker(domain='d', task_list='tl')37        worker.task_heartbeat_in_seconds = 138        worker.task_token = 't'39        worker.start_heartbeat()40        worker.heartbeat_sender.send_heartbeats.assert_called_once_with(timeout=1, task_token='t')41    def test_stop_heartbeat(self, mocker):42        mocker.patch('floto.HeartbeatSender.stop_heartbeats')43        worker = floto.ActivityWorker(domain='d', task_list='tl')44        worker.stop_heartbeat()45        worker.heartbeat_sender.stop_heartbeats.assert_called_once_with()46@floto.activity(domain='d', name='my_activity_type', version='v1')47def do_work():48    return 'success'49@floto.activity(domain='d', name='my_activity_type', version='v2')50def do_work_and_return_context(context):51    return context52@floto.activity(domain='d', name='activity_fails', version='v1')53def fail():54    raise ValueError('Activity failed')55class MockedActivityWorker(floto.ActivityWorker):56    def task_failed(self, error):57        self._error = error58class TestActivityWorkerAPICalls:59    def test_args(self):60        sig = signature(floto.ACTIVITY_FUNCTIONS['my_activity_type:v2:d'])61        assert ('context' in sig.parameters)62    def test_poll(self, mocker, activity_task):63        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)64        worker = floto.ActivityWorker(domain='d', task_list='task_list', identity='test machine')65        worker.poll()66        worker.swf.poll_for_activity_task.assert_called_once_with(task_list='task_list', domain='d',67                                                                  identity='test machine')68        assert worker.task_token == 'the_task_token'69        assert worker.last_response['activityId'] == 'my_activity_id'70    def test_run_with_response(self, mocker, activity_task):71        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)72        mocker.patch('floto.ActivityWorker.complete')73        worker = floto.ActivityWorker(domain='d', task_list='tl')74        worker.max_polls = 175        worker.run()76        worker.complete.assert_called_once_with()77    def test_run_with_context(self, mocker):78        activity_task_with_input = {'activityId': 'my_activity_id',79                                    'activityType': {'name': 'my_activity_type', 'version': 'v2'},80                                    'taskToken': 'the_task_token',81                                    'input': {'foo': 'bar'}}82        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task_with_input)83        mocker.patch('floto.ActivityWorker.complete')84        worker = floto.ActivityWorker(domain='d', task_list='tl')85        worker.max_polls = 186        worker.run()87        assert worker.result == {'foo': 'bar'}88    def test_run_with_context_wo_input(self, mocker):89        activity_task_with_input = {'activityId': 'my_activity_id',90                                    'activityType': {'name': 'my_activity_type', 'version': 'v2'},91                                    'taskToken': 'the_task_token'}92        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task_with_input)93        mocker.patch('floto.ActivityWorker.complete')94        worker = floto.ActivityWorker(domain='d', task_list='tl')95        worker.max_polls = 196        worker.run()97        assert worker.result == {}98    def test_run_without_response(self, mocker):99        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value={})100        mocker.patch('floto.ActivityWorker.complete')101        worker = floto.ActivityWorker(domain='d', task_list='tl')102        worker.max_polls = 1103        worker.run()104        worker.complete.assert_not_called()105    def test_run_with_failing_activity(self, mocker):106        failing_task = {'activityType': {'name': 'activity_fails', 'version': 'v1'},107                        'taskToken': 'the_task_token'}108        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)109        mocker.patch('floto.ActivityWorker.complete')110        worker = MockedActivityWorker(domain='d', task_list='tl')111        worker.max_polls = 1112        worker.run()113        assert str(worker._error) == 'Activity failed'114    def test_run_with_activity_not_found(self, mocker):115        failing_task = {'activityType': {'name': 'activity_unknown', 'version': 'v1'},116                        'taskToken': 'the_task_token'}117        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)118        mocker.patch('floto.ActivityWorker.complete')119        worker = MockedActivityWorker(domain='d', task_list='tl')120        worker.max_polls = 1121        worker.run()122        message = 'No activity with id activity_unknown:v1:d registered'123        assert str(worker._error) == message124    def test_task_failed(self, mocker):125        client_mock = type('ClientMock', (object,), {'respond_activity_task_failed': Mock()})126        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock())127        worker = floto.ActivityWorker(domain='d', task_list='tl')128        worker.task_token = 'abc'129        worker.task_failed(Exception('some error'))130        expected_args = {'taskToken': 'abc', 'details': 'some error'}131        worker.swf.client.respond_activity_task_failed.assert_called_once_with(**expected_args)132    def test_terminate_worker(self):133        worker = floto.ActivityWorker(domain='d', task_list='tl')134        worker.terminate_worker()135        assert worker.get_terminate_activity_worker()136    def test_complete_result_string(self, mocker, worker):137        worker.result = 'result'138        worker.complete()139        assertion = {'taskToken':worker.task_token, 'result':'result'}140        worker.swf.client.respond_activity_task_completed.assert_called_once_with(**assertion)141    def test_complete_result_object(self, mocker, worker):142        worker.result = {'foo':'bar'}143        worker.complete()144        assertion = {'taskToken':worker.task_token, 'result':json.dumps({'foo':'bar'})}145        worker.swf.client.respond_activity_task_completed.assert_called_once_with(**assertion)146    def test_run_start_stop_heartbeats_with_successful_activity(self, mocker, activity_task):147        mocker.patch('floto.ActivityWorker.complete')148        mocker.patch('floto.ActivityWorker.start_heartbeat')149        mocker.patch('floto.ActivityWorker.stop_heartbeat')150        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)151        worker = floto.ActivityWorker(domain='d', task_list='tl')152        worker.max_polls = 1153        worker.run()154        worker.start_heartbeat.assert_called_once_with()155        worker.stop_heartbeat.assert_called_once_with()156    def test_run_start_stop_heartbeats_with_failing_activity(self, mocker):157        failing_task = {'activityType': {'name': 'activity_fails', 'version': 'v1'},158                        'taskToken': 'the_task_token'}159        mocker.patch('floto.ActivityWorker.task_failed')160        mocker.patch('floto.ActivityWorker.start_heartbeat')161        mocker.patch('floto.ActivityWorker.stop_heartbeat')162        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)163        worker = floto.ActivityWorker(domain='d', task_list='tl')164        worker.max_polls = 1165        worker.run()166        worker.start_heartbeat.assert_called_once_with()167        worker.stop_heartbeat.assert_called_once_with()168    def test_run_start_stop_heartbeats_with_non_existing_activity(self, mocker):169        failing_task = {'activityType': {'name': 'activity_unknown', 'version': 'v1'},170                        'taskToken': 'the_task_token'}171        mocker.patch('floto.ActivityWorker.task_failed')172        mocker.patch('floto.ActivityWorker.start_heartbeat')173        mocker.patch('floto.ActivityWorker.stop_heartbeat')174        mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)175        worker = floto.ActivityWorker(domain='d', task_list='tl')176        worker.max_polls = 1177        worker.run()178        worker.start_heartbeat.assert_not_called()179    def test_start_heartbeat_deactivation(self):180        worker = floto.ActivityWorker(domain='d', task_list='tl', task_heartbeat_in_seconds=0)181        worker.heartbeat_sender.send_heartbeats = Mock()182        worker.start_heartbeat()...test_activity_tasks.py
Source:test_activity_tasks.py  
...11    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]12    conn.respond_decision_task_completed(13        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]14    )15    resp = conn.poll_for_activity_task(16        "test-domain", "activity-task-list", identity="surprise"17    )18    resp["activityId"].should.equal("my-activity-001")19    resp["taskToken"].should_not.be.none20    resp = conn.get_workflow_execution_history(21        "test-domain", conn.run_id, "uid-abcd1234"22    )23    resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted")24    resp["events"][-1]["activityTaskStartedEventAttributes"].should.equal(25        {"identity": "surprise", "scheduledEventId": 5}26    )27@mock_swf_deprecated28def test_poll_for_activity_task_when_none():29    conn = setup_workflow()30    resp = conn.poll_for_activity_task("test-domain", "activity-task-list")31    resp.should.equal({"startedEventId": 0})32@mock_swf_deprecated33def test_poll_for_activity_task_on_non_existent_queue():34    conn = setup_workflow()35    resp = conn.poll_for_activity_task("test-domain", "non-existent-queue")36    resp.should.equal({"startedEventId": 0})37# CountPendingActivityTasks endpoint38@mock_swf_deprecated39def test_count_pending_activity_tasks():40    conn = setup_workflow()41    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]42    conn.respond_decision_task_completed(43        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]44    )45    resp = conn.count_pending_activity_tasks("test-domain", "activity-task-list")46    resp.should.equal({"count": 1, "truncated": False})47@mock_swf_deprecated48def test_count_pending_decision_tasks_on_non_existent_task_list():49    conn = setup_workflow()50    resp = conn.count_pending_activity_tasks("test-domain", "non-existent")51    resp.should.equal({"count": 0, "truncated": False})52# RespondActivityTaskCompleted endpoint53@mock_swf_deprecated54def test_respond_activity_task_completed():55    conn = setup_workflow()56    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]57    conn.respond_decision_task_completed(58        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]59    )60    activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[61        "taskToken"62    ]63    resp = conn.respond_activity_task_completed(64        activity_token, result="result of the task"65    )66    resp.should.be.none67    resp = conn.get_workflow_execution_history(68        "test-domain", conn.run_id, "uid-abcd1234"69    )70    resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")71    resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(72        {"result": "result of the task", "scheduledEventId": 5, "startedEventId": 6}73    )74@mock_swf_deprecated75def test_respond_activity_task_completed_on_closed_workflow_execution():76    conn = setup_workflow()77    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]78    conn.respond_decision_task_completed(79        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]80    )81    activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[82        "taskToken"83    ]84    # bad: we're closing workflow execution manually, but endpoints are not85    # coded for now..86    wfe = swf_backend.domains[0].workflow_executions[-1]87    wfe.execution_status = "CLOSED"88    # /bad89    conn.respond_activity_task_completed.when.called_with(activity_token).should.throw(90        SWFResponseError, "WorkflowExecution="91    )92@mock_swf_deprecated93def test_respond_activity_task_completed_with_task_already_completed():94    conn = setup_workflow()95    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]96    conn.respond_decision_task_completed(97        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]98    )99    activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[100        "taskToken"101    ]102    conn.respond_activity_task_completed(activity_token)103    conn.respond_activity_task_completed.when.called_with(activity_token).should.throw(104        SWFResponseError, "Unknown activity, scheduledEventId = 5"105    )106# RespondActivityTaskFailed endpoint107@mock_swf_deprecated108def test_respond_activity_task_failed():109    conn = setup_workflow()110    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]111    conn.respond_decision_task_completed(112        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]113    )114    activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[115        "taskToken"116    ]117    resp = conn.respond_activity_task_failed(118        activity_token, reason="short reason", details="long details"119    )120    resp.should.be.none121    resp = conn.get_workflow_execution_history(122        "test-domain", conn.run_id, "uid-abcd1234"123    )124    resp["events"][-2]["eventType"].should.equal("ActivityTaskFailed")125    resp["events"][-2]["activityTaskFailedEventAttributes"].should.equal(126        {127            "reason": "short reason",128            "details": "long details",129            "scheduledEventId": 5,130            "startedEventId": 6,131        }132    )133@mock_swf_deprecated134def test_respond_activity_task_completed_with_wrong_token():135    # NB: we just test ONE failure case for RespondActivityTaskFailed136    # because the safeguards are shared with RespondActivityTaskCompleted, so137    # no need to retest everything end-to-end.138    conn = setup_workflow()139    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]140    conn.respond_decision_task_completed(141        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]142    )143    conn.poll_for_activity_task("test-domain", "activity-task-list")144    conn.respond_activity_task_failed.when.called_with(145        "not-a-correct-token"146    ).should.throw(SWFResponseError, "Invalid token")147# RecordActivityTaskHeartbeat endpoint148@mock_swf_deprecated149def test_record_activity_task_heartbeat():150    conn = setup_workflow()151    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]152    conn.respond_decision_task_completed(153        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]154    )155    activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[156        "taskToken"157    ]158    resp = conn.record_activity_task_heartbeat(activity_token)159    resp.should.equal({"cancelRequested": False})160@mock_swf_deprecated161def test_record_activity_task_heartbeat_with_wrong_token():162    conn = setup_workflow()163    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]164    conn.respond_decision_task_completed(165        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]166    )167    conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]168    conn.record_activity_task_heartbeat.when.called_with(169        "bad-token", details="some progress details"170    ).should.throw(SWFResponseError)171@mock_swf_deprecated172def test_record_activity_task_heartbeat_sets_details_in_case_of_timeout():173    conn = setup_workflow()174    decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]175    conn.respond_decision_task_completed(176        decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]177    )178    with freeze_time("2015-01-01 12:00:00"):179        activity_token = conn.poll_for_activity_task(180            "test-domain", "activity-task-list"181        )["taskToken"]182        conn.record_activity_task_heartbeat(183            activity_token, details="some progress details"184        )185    with freeze_time("2015-01-01 12:05:30"):186        # => Activity Task Heartbeat timeout reached!!187        resp = conn.get_workflow_execution_history(188            "test-domain", conn.run_id, "uid-abcd1234"189        )190        resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut")191        attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
