How to use respond_activity_task_failed method in localstack

Best Python code snippet using localstack_python

test_fulfillment_worker.py

Source:test_fulfillment_worker.py Github

copy

Full Screen

1#!/usr/bin/python2import json3import unittest4from unittest.mock import Mock, MagicMock5from protocol.fulfillment_worker import FulfillmentWorker6from protocol.schema import StringParameter, StringResult7from protocol.fulfillment_exception import FulfillmentFatalException8from protocol.response import ActivityStatus9REGION = 'us-east-1'10SWF_DOMAIN = 'fulfillment_test'11ACTIVITY_NAME = 'test'12ACTIVITY_VERSION = '1'13TASK_LIST = {'name': ACTIVITY_NAME + ACTIVITY_VERSION}14TASK_TOKEN = 'SOMETASKTOKEN'15INPUT = {16 'stuff': 'things'17}18TASK = {19 'taskToken': TASK_TOKEN,20 'input': json.dumps(INPUT)21}22RESULT = 'some result'23ERROR_MESSAGE = 'loud noises!'24class TestFulfillmentWorker(unittest.TestCase):25 def test_no_task(self):26 worker = FulfillmentWorker(27 description='This is a test worker',28 parameters={29 'stuff': StringParameter('some stuff')30 },31 result=StringResult('the result'),32 handler=Mock(),33 region=REGION,34 activity_name=ACTIVITY_NAME,35 activity_version=ACTIVITY_VERSION,36 swf_domain=SWF_DOMAIN37 )38 worker._swf.poll_for_activity_task = MagicMock(return_value={})39 task_token = worker.run()40 self.assertIsNone(task_token)41 worker._swf.poll_for_activity_task.assert_called_with(domain=SWF_DOMAIN, taskList=TASK_LIST)42 worker._handler.assert_not_called()43 def test_success(self):44 worker = FulfillmentWorker(45 description='This is a test worker',46 parameters={47 'stuff': StringParameter('some stuff')48 },49 result=StringResult('the result'),50 handler=Mock(return_value=RESULT),51 region=REGION,52 activity_name=ACTIVITY_NAME,53 activity_version=ACTIVITY_VERSION,54 swf_domain=SWF_DOMAIN55 )56 worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)57 worker._swf.respond_activity_task_completed = MagicMock()58 task_token = worker.run()59 expected = {60 'taskToken': TASK_TOKEN,61 'result': {62 'status': ActivityStatus.SUCCESS,63 'notes': [],64 'reason': None,65 'result': RESULT,66 'trace': []67 }68 }69 self.assertEqual(task_token, TASK_TOKEN)70 worker._swf.poll_for_activity_task.assert_called_once_with(domain=SWF_DOMAIN, taskList=TASK_LIST)71 worker._handler.assert_called_once_with(stuff=INPUT['stuff'])72 worker._swf.respond_activity_task_completed.assert_called_once()73 call_args = worker._swf.respond_activity_task_completed.call_args[1]74 call_args['result'] = json.loads(call_args['result'])75 self.assertEqual(call_args, expected)76 expected = {'activity': {'name': 'test', 'version': '1'},77 'description': 'This is a test worker',78 'params': {'description': '',79 'properties': {'stuff': {'description': 'some stuff',80 'type': 'string'}},81 'required': ['stuff'],82 'type': 'object'},83 'result': {'description': 'the result', 'type': 'string'}}84 self.assertEqual(worker.handle(None, {'RETURN_SCHEMA': True}), expected)85 def test_fatal_error(self):86 error = FulfillmentFatalException(message=ERROR_MESSAGE)87 worker = FulfillmentWorker(88 description='This is a test worker',89 parameters={90 'stuff': StringParameter('some stuff')91 },92 result=StringResult('the result'),93 handler=Mock(side_effect=error),94 region=REGION,95 activity_name=ACTIVITY_NAME,96 activity_version=ACTIVITY_VERSION,97 swf_domain=SWF_DOMAIN98 )99 worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)100 worker._swf.respond_activity_task_failed = MagicMock()101 task_token = worker.run()102 expected = {103 'taskToken': TASK_TOKEN,104 'details': {105 'status': ActivityStatus.FATAL,106 'notes': [],107 'reason': ERROR_MESSAGE,108 'result': ERROR_MESSAGE,109 'trace': error.trace()110 },111 'reason': ERROR_MESSAGE112 }113 self.assertEqual(task_token, TASK_TOKEN)114 worker._swf.respond_activity_task_failed.assert_called_once()115 call_args = worker._swf.respond_activity_task_failed.call_args[1]116 call_args['details'] = json.loads(call_args['details'])117 self.assertEqual(call_args, expected)118 def test_default_error(self):119 error = Exception(ERROR_MESSAGE)120 worker = FulfillmentWorker(121 description='This is a test worker',122 parameters={123 'stuff': StringParameter('some stuff')124 },125 result=StringResult('the result'),126 handler=Mock(side_effect=error),127 region=REGION,128 activity_name=ACTIVITY_NAME,129 activity_version=ACTIVITY_VERSION,130 swf_domain=SWF_DOMAIN,131 default_exception=FulfillmentFatalException132 )133 worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)134 worker._fail = MagicMock()135 task_token = worker.run()136 self.assertEqual(task_token, TASK_TOKEN)137 worker._fail.assert_called_once()138 call_args = worker._fail.call_args[0]139 self.assertEqual(call_args[0], TASK_TOKEN)140 self.assertIsInstance(call_args[1], FulfillmentFatalException)141 142 def test_validation_error(self):143 error = Exception(ERROR_MESSAGE)144 worker = FulfillmentWorker(145 description='This is a test worker',146 parameters={147 'stuff': StringParameter('some stuff')148 },149 result=StringResult('the result'),150 handler=Mock(side_effect=error),151 region=REGION,152 activity_name=ACTIVITY_NAME,153 activity_version=ACTIVITY_VERSION,154 swf_domain=SWF_DOMAIN155 )156 worker._swf.poll_for_activity_task = MagicMock(return_value={157 'taskToken': TASK_TOKEN,158 'input': '{"stuff": 1}'159 })160 worker._swf.respond_activity_task_failed = MagicMock()161 task_token = worker.run()162 expected = {163 'taskToken': TASK_TOKEN,164 'details': json.dumps({165 "status": ActivityStatus.INVALID,166 "notes": [],167 "trace": [],168 "reason": None,169 "validation_errors": [170 {171 "cause": None,172 "context": [],173 "message": "1 is not of type 'string'",174 "path": "stuff",175 "relative_path": "stuff",176 "absolute_path": "stuff",177 "validator": "type",178 "validator_value": "string"179 }180 ]181 }),182 'reason': '1 validation error(s)'183 }184 self.assertEqual(task_token, TASK_TOKEN)185 call_args = worker._swf.respond_activity_task_failed.call_args[1]186 self.assertEqual(call_args, expected)187if __name__ == '__main__':...

Full Screen

Full Screen

activity_task_test.py

Source:activity_task_test.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from __future__ import absolute_import3from __future__ import unicode_literals4import mock5import pytest6from botocore.vendored.requests.exceptions import ReadTimeout7from py_swf.clients.activity_task import ActivityTask8from py_swf.clients.activity_task import ActivityTaskClient9from py_swf.errors import NoTaskFound10from testing.util import DictMock11@pytest.fixture12def activity_task_config():13 return mock.Mock()14@pytest.fixture15def activity_task_client(activity_task_config, boto_client):16 return ActivityTaskClient(17 activity_task_config,18 boto_client,19 )20class TestPolling:21 @pytest.fixture22 def boto_client_results(self):23 return DictMock()24 @pytest.fixture(autouse=True)25 def patch_poll_for_activity_task(self, boto_client_results, boto_client):26 boto_client.poll_for_activity_task.return_value = boto_client_results27 @pytest.fixture28 def expected_activity_task(self, boto_client_results):29 return ActivityTask(30 activity_id=boto_client_results['activityId'],31 type=boto_client_results['activityType']['name'],32 version=boto_client_results['activityType']['version'],33 input=boto_client_results['input'],34 task_token=boto_client_results['taskToken'],35 workflow_id=boto_client_results['workflowExecution']['workflowId'],36 workflow_run_id=boto_client_results['workflowExecution']['runId'],37 )38 def test_poll_with_identity(self, activity_task_config, expected_activity_task, activity_task_client, boto_client):39 result_activity_task = activity_task_client.poll(identity='meow')40 assert result_activity_task == expected_activity_task41 boto_client.poll_for_activity_task.assert_called_once_with(42 domain=activity_task_config.domain,43 identity=u'meow',44 taskList={45 u'name': activity_task_config.task_list,46 },47 )48 def test_poll_without_identity(self, activity_task_config, expected_activity_task, activity_task_client, boto_client):49 result_activity_task = activity_task_client.poll()50 assert result_activity_task == expected_activity_task51 boto_client.poll_for_activity_task.assert_called_once_with(52 domain=activity_task_config.domain,53 taskList={54 u'name': activity_task_config.task_list,55 },56 )57 def test_poll_timeout(self, activity_task_client, boto_client):58 boto_client.poll_for_activity_task.side_effect = ReadTimeout59 with pytest.raises(NoTaskFound):60 activity_task_client.poll()61class TestPollingWithBadResults:62 @pytest.fixture(autouse=True)63 def patch_poll_for_activity_task(self, boto_client_faulty_results, boto_client):64 boto_client.poll_for_activity_task.return_value = boto_client_faulty_results65 @pytest.fixture66 def boto_client_faulty_results(self):67 return {68 "startedEventId": 0,69 "previousStartedEventId": 0,70 "ResponseMetadata": {71 'HTTPStatusCode': 200,72 'RequestId': 'b1cde981-23a0-11e6-98d4-5316d1b97440'73 }74 }75 def test_receive_result_without_task_token(self, activity_task_client):76 with pytest.raises(NoTaskFound):77 activity_task_client.poll()78def test_finish(activity_task_client, boto_client):79 task_token = mock.Mock()80 result = mock.Mock()81 activity_task_client.finish(task_token, result)82 boto_client.respond_activity_task_completed.assert_called_once_with(83 result=result,84 taskToken=task_token,85 )86def test_fail_with_detail(activity_task_client, boto_client):87 task_token = mock.Mock()88 reason = "test_fail"89 detail = "detail"90 activity_task_client.fail(task_token, reason, detail)91 boto_client.respond_activity_task_failed.assert_called_once_with(92 details=detail,93 reason=reason,94 taskToken=task_token,95 )96def test_fail_without_detail(activity_task_client, boto_client):97 task_token = mock.Mock()98 reason = "test_fail"99 activity_task_client.fail(task_token, reason)100 boto_client.respond_activity_task_failed.assert_called_once_with(101 reason=reason,102 taskToken=task_token,...

Full Screen

Full Screen

activity_task.py

Source:activity_task.py Github

copy

Full Screen

...85 reason=reason,86 )87 if details is not None:88 kwargs["details"] = details89 self.boto_client.respond_activity_task_failed(90 taskToken=task_token,91 **kwargs...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful