How to use poll_for_activity_task method in localstack

Best Python code snippet using localstack_python

test_activity_worker.py

Source:test_activity_worker.py Github

copy

Full Screen

1import pytest2import json3from inspect import signature4from unittest.mock import PropertyMock, Mock5import floto.api6@pytest.fixture7def activity_task():8 return {'activityId': 'my_activity_id',9 'activityType': {'name': 'my_activity_type', 'version': 'v1', 'domain': 'd'},10 'startedEventId': 7,11 'taskToken': 'the_task_token',12 'workflowExecution': {'runId': 'the_run_id', 'workflowId': 'the_workflow_id'}}13@pytest.fixture14def worker(mocker):15 mocked_functions = {'respond_activity_task_completed':Mock(),16 'respond_activity_task_failed':Mock()}17 client_mock = type("ClientMock", (object,), mocked_functions)18 mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock())19 worker = floto.ActivityWorker(domain='d', task_list='tl')20 worker.result = 'result'21 worker.task_token = 'tt'22 return worker23class TestActivityWorker:24 def test_init(self):25 worker = floto.ActivityWorker(domain='d', task_list='tl', task_heartbeat_in_seconds=10)26 assert worker.task_heartbeat_in_seconds == 1027 def test_init_with_defaults(self):28 worker = floto.ActivityWorker(domain='d', task_list='tl')29 assert worker.task_heartbeat_in_seconds == 9030 def test_terminate_activity_worker(self):31 worker = floto.ActivityWorker(domain='d', task_list='tl')32 worker.terminate_worker()33 assert worker._terminate_activity_worker == True34 def test_start_heartbeat(self, mocker):35 mocker.patch('floto.HeartbeatSender.send_heartbeats')36 worker = floto.ActivityWorker(domain='d', task_list='tl')37 worker.task_heartbeat_in_seconds = 138 worker.task_token = 't'39 worker.start_heartbeat()40 worker.heartbeat_sender.send_heartbeats.assert_called_once_with(timeout=1, task_token='t')41 def test_stop_heartbeat(self, mocker):42 mocker.patch('floto.HeartbeatSender.stop_heartbeats')43 worker = floto.ActivityWorker(domain='d', task_list='tl')44 worker.stop_heartbeat()45 worker.heartbeat_sender.stop_heartbeats.assert_called_once_with()46@floto.activity(domain='d', name='my_activity_type', version='v1')47def do_work():48 return 'success'49@floto.activity(domain='d', name='my_activity_type', version='v2')50def do_work_and_return_context(context):51 return context52@floto.activity(domain='d', name='activity_fails', version='v1')53def fail():54 raise ValueError('Activity failed')55class MockedActivityWorker(floto.ActivityWorker):56 def task_failed(self, error):57 self._error = error58class TestActivityWorkerAPICalls:59 def test_args(self):60 sig = signature(floto.ACTIVITY_FUNCTIONS['my_activity_type:v2:d'])61 assert ('context' in sig.parameters)62 def test_poll(self, mocker, activity_task):63 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)64 worker = floto.ActivityWorker(domain='d', task_list='task_list', identity='test machine')65 worker.poll()66 worker.swf.poll_for_activity_task.assert_called_once_with(task_list='task_list', domain='d',67 identity='test machine')68 assert worker.task_token == 'the_task_token'69 assert worker.last_response['activityId'] == 'my_activity_id'70 def test_run_with_response(self, mocker, activity_task):71 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)72 mocker.patch('floto.ActivityWorker.complete')73 worker = floto.ActivityWorker(domain='d', task_list='tl')74 worker.max_polls = 175 worker.run()76 worker.complete.assert_called_once_with()77 def test_run_with_context(self, mocker):78 activity_task_with_input = {'activityId': 'my_activity_id',79 'activityType': {'name': 'my_activity_type', 'version': 'v2'},80 'taskToken': 'the_task_token',81 'input': {'foo': 'bar'}}82 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task_with_input)83 mocker.patch('floto.ActivityWorker.complete')84 worker = floto.ActivityWorker(domain='d', task_list='tl')85 worker.max_polls = 186 worker.run()87 assert worker.result == {'foo': 'bar'}88 def test_run_with_context_wo_input(self, mocker):89 activity_task_with_input = {'activityId': 'my_activity_id',90 'activityType': {'name': 'my_activity_type', 'version': 'v2'},91 'taskToken': 'the_task_token'}92 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task_with_input)93 mocker.patch('floto.ActivityWorker.complete')94 worker = floto.ActivityWorker(domain='d', task_list='tl')95 worker.max_polls = 196 worker.run()97 assert worker.result == {}98 def test_run_without_response(self, mocker):99 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value={})100 mocker.patch('floto.ActivityWorker.complete')101 worker = floto.ActivityWorker(domain='d', task_list='tl')102 worker.max_polls = 1103 worker.run()104 worker.complete.assert_not_called()105 def test_run_with_failing_activity(self, mocker):106 failing_task = {'activityType': {'name': 'activity_fails', 'version': 'v1'},107 'taskToken': 'the_task_token'}108 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)109 mocker.patch('floto.ActivityWorker.complete')110 worker = MockedActivityWorker(domain='d', task_list='tl')111 worker.max_polls = 1112 worker.run()113 assert str(worker._error) == 'Activity failed'114 def test_run_with_activity_not_found(self, mocker):115 failing_task = {'activityType': {'name': 'activity_unknown', 'version': 'v1'},116 'taskToken': 'the_task_token'}117 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)118 mocker.patch('floto.ActivityWorker.complete')119 worker = MockedActivityWorker(domain='d', task_list='tl')120 worker.max_polls = 1121 worker.run()122 message = 'No activity with id activity_unknown:v1:d registered'123 assert str(worker._error) == message124 def test_task_failed(self, mocker):125 client_mock = type('ClientMock', (object,), {'respond_activity_task_failed': Mock()})126 mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock())127 worker = floto.ActivityWorker(domain='d', task_list='tl')128 worker.task_token = 'abc'129 worker.task_failed(Exception('some error'))130 expected_args = {'taskToken': 'abc', 'details': 'some error'}131 worker.swf.client.respond_activity_task_failed.assert_called_once_with(**expected_args)132 def test_terminate_worker(self):133 worker = floto.ActivityWorker(domain='d', task_list='tl')134 worker.terminate_worker()135 assert worker.get_terminate_activity_worker()136 def test_complete_result_string(self, mocker, worker):137 worker.result = 'result'138 worker.complete()139 assertion = {'taskToken':worker.task_token, 'result':'result'}140 worker.swf.client.respond_activity_task_completed.assert_called_once_with(**assertion)141 def test_complete_result_object(self, mocker, worker):142 worker.result = {'foo':'bar'}143 worker.complete()144 assertion = {'taskToken':worker.task_token, 'result':json.dumps({'foo':'bar'})}145 worker.swf.client.respond_activity_task_completed.assert_called_once_with(**assertion)146 def test_run_start_stop_heartbeats_with_successful_activity(self, mocker, activity_task):147 mocker.patch('floto.ActivityWorker.complete')148 mocker.patch('floto.ActivityWorker.start_heartbeat')149 mocker.patch('floto.ActivityWorker.stop_heartbeat')150 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=activity_task)151 worker = floto.ActivityWorker(domain='d', task_list='tl')152 worker.max_polls = 1153 worker.run()154 worker.start_heartbeat.assert_called_once_with()155 worker.stop_heartbeat.assert_called_once_with()156 def test_run_start_stop_heartbeats_with_failing_activity(self, mocker):157 failing_task = {'activityType': {'name': 'activity_fails', 'version': 'v1'},158 'taskToken': 'the_task_token'}159 mocker.patch('floto.ActivityWorker.task_failed')160 mocker.patch('floto.ActivityWorker.start_heartbeat')161 mocker.patch('floto.ActivityWorker.stop_heartbeat')162 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)163 worker = floto.ActivityWorker(domain='d', task_list='tl')164 worker.max_polls = 1165 worker.run()166 worker.start_heartbeat.assert_called_once_with()167 worker.stop_heartbeat.assert_called_once_with()168 def test_run_start_stop_heartbeats_with_non_existing_activity(self, mocker):169 failing_task = {'activityType': {'name': 'activity_unknown', 'version': 'v1'},170 'taskToken': 'the_task_token'}171 mocker.patch('floto.ActivityWorker.task_failed')172 mocker.patch('floto.ActivityWorker.start_heartbeat')173 mocker.patch('floto.ActivityWorker.stop_heartbeat')174 mocker.patch('floto.api.Swf.poll_for_activity_task', return_value=failing_task)175 worker = floto.ActivityWorker(domain='d', task_list='tl')176 worker.max_polls = 1177 worker.run()178 worker.start_heartbeat.assert_not_called()179 def test_start_heartbeat_deactivation(self):180 worker = floto.ActivityWorker(domain='d', task_list='tl', task_heartbeat_in_seconds=0)181 worker.heartbeat_sender.send_heartbeats = Mock()182 worker.start_heartbeat()...

Full Screen

Full Screen

test_activity_tasks.py

Source:test_activity_tasks.py Github

copy

Full Screen

...11 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]12 conn.respond_decision_task_completed(13 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]14 )15 resp = conn.poll_for_activity_task(16 "test-domain", "activity-task-list", identity="surprise"17 )18 resp["activityId"].should.equal("my-activity-001")19 resp["taskToken"].should_not.be.none20 resp = conn.get_workflow_execution_history(21 "test-domain", conn.run_id, "uid-abcd1234"22 )23 resp["events"][-1]["eventType"].should.equal("ActivityTaskStarted")24 resp["events"][-1]["activityTaskStartedEventAttributes"].should.equal(25 {"identity": "surprise", "scheduledEventId": 5}26 )27@mock_swf_deprecated28def test_poll_for_activity_task_when_none():29 conn = setup_workflow()30 resp = conn.poll_for_activity_task("test-domain", "activity-task-list")31 resp.should.equal({"startedEventId": 0})32@mock_swf_deprecated33def test_poll_for_activity_task_on_non_existent_queue():34 conn = setup_workflow()35 resp = conn.poll_for_activity_task("test-domain", "non-existent-queue")36 resp.should.equal({"startedEventId": 0})37# CountPendingActivityTasks endpoint38@mock_swf_deprecated39def test_count_pending_activity_tasks():40 conn = setup_workflow()41 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]42 conn.respond_decision_task_completed(43 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]44 )45 resp = conn.count_pending_activity_tasks("test-domain", "activity-task-list")46 resp.should.equal({"count": 1, "truncated": False})47@mock_swf_deprecated48def test_count_pending_decision_tasks_on_non_existent_task_list():49 conn = setup_workflow()50 resp = conn.count_pending_activity_tasks("test-domain", "non-existent")51 resp.should.equal({"count": 0, "truncated": False})52# RespondActivityTaskCompleted endpoint53@mock_swf_deprecated54def test_respond_activity_task_completed():55 conn = setup_workflow()56 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]57 conn.respond_decision_task_completed(58 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]59 )60 activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[61 "taskToken"62 ]63 resp = conn.respond_activity_task_completed(64 activity_token, result="result of the task"65 )66 resp.should.be.none67 resp = conn.get_workflow_execution_history(68 "test-domain", conn.run_id, "uid-abcd1234"69 )70 resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")71 resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(72 {"result": "result of the task", "scheduledEventId": 5, "startedEventId": 6}73 )74@mock_swf_deprecated75def test_respond_activity_task_completed_on_closed_workflow_execution():76 conn = setup_workflow()77 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]78 conn.respond_decision_task_completed(79 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]80 )81 activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[82 "taskToken"83 ]84 # bad: we're closing workflow execution manually, but endpoints are not85 # coded for now..86 wfe = swf_backend.domains[0].workflow_executions[-1]87 wfe.execution_status = "CLOSED"88 # /bad89 conn.respond_activity_task_completed.when.called_with(activity_token).should.throw(90 SWFResponseError, "WorkflowExecution="91 )92@mock_swf_deprecated93def test_respond_activity_task_completed_with_task_already_completed():94 conn = setup_workflow()95 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]96 conn.respond_decision_task_completed(97 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]98 )99 activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[100 "taskToken"101 ]102 conn.respond_activity_task_completed(activity_token)103 conn.respond_activity_task_completed.when.called_with(activity_token).should.throw(104 SWFResponseError, "Unknown activity, scheduledEventId = 5"105 )106# RespondActivityTaskFailed endpoint107@mock_swf_deprecated108def test_respond_activity_task_failed():109 conn = setup_workflow()110 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]111 conn.respond_decision_task_completed(112 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]113 )114 activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[115 "taskToken"116 ]117 resp = conn.respond_activity_task_failed(118 activity_token, reason="short reason", details="long details"119 )120 resp.should.be.none121 resp = conn.get_workflow_execution_history(122 "test-domain", conn.run_id, "uid-abcd1234"123 )124 resp["events"][-2]["eventType"].should.equal("ActivityTaskFailed")125 resp["events"][-2]["activityTaskFailedEventAttributes"].should.equal(126 {127 "reason": "short reason",128 "details": "long details",129 "scheduledEventId": 5,130 "startedEventId": 6,131 }132 )133@mock_swf_deprecated134def test_respond_activity_task_completed_with_wrong_token():135 # NB: we just test ONE failure case for RespondActivityTaskFailed136 # because the safeguards are shared with RespondActivityTaskCompleted, so137 # no need to retest everything end-to-end.138 conn = setup_workflow()139 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]140 conn.respond_decision_task_completed(141 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]142 )143 conn.poll_for_activity_task("test-domain", "activity-task-list")144 conn.respond_activity_task_failed.when.called_with(145 "not-a-correct-token"146 ).should.throw(SWFResponseError, "Invalid token")147# RecordActivityTaskHeartbeat endpoint148@mock_swf_deprecated149def test_record_activity_task_heartbeat():150 conn = setup_workflow()151 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]152 conn.respond_decision_task_completed(153 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]154 )155 activity_token = conn.poll_for_activity_task("test-domain", "activity-task-list")[156 "taskToken"157 ]158 resp = conn.record_activity_task_heartbeat(activity_token)159 resp.should.equal({"cancelRequested": False})160@mock_swf_deprecated161def test_record_activity_task_heartbeat_with_wrong_token():162 conn = setup_workflow()163 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]164 conn.respond_decision_task_completed(165 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]166 )167 conn.poll_for_activity_task("test-domain", "activity-task-list")["taskToken"]168 conn.record_activity_task_heartbeat.when.called_with(169 "bad-token", details="some progress details"170 ).should.throw(SWFResponseError)171@mock_swf_deprecated172def test_record_activity_task_heartbeat_sets_details_in_case_of_timeout():173 conn = setup_workflow()174 decision_token = conn.poll_for_decision_task("test-domain", "queue")["taskToken"]175 conn.respond_decision_task_completed(176 decision_token, decisions=[SCHEDULE_ACTIVITY_TASK_DECISION]177 )178 with freeze_time("2015-01-01 12:00:00"):179 activity_token = conn.poll_for_activity_task(180 "test-domain", "activity-task-list"181 )["taskToken"]182 conn.record_activity_task_heartbeat(183 activity_token, details="some progress details"184 )185 with freeze_time("2015-01-01 12:05:30"):186 # => Activity Task Heartbeat timeout reached!!187 resp = conn.get_workflow_execution_history(188 "test-domain", conn.run_id, "uid-abcd1234"189 )190 resp["events"][-2]["eventType"].should.equal("ActivityTaskTimedOut")191 attrs = resp["events"][-2]["activityTaskTimedOutEventAttributes"]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful