Best Python code snippet using localstack_python
test_fulfillment_worker.py
Source:test_fulfillment_worker.py  
1#!/usr/bin/python2import json3import unittest4from unittest.mock import Mock, MagicMock5from protocol.fulfillment_worker import FulfillmentWorker6from protocol.schema import StringParameter, StringResult7from protocol.fulfillment_exception import FulfillmentFatalException8from protocol.response import ActivityStatus9REGION = 'us-east-1'10SWF_DOMAIN = 'fulfillment_test'11ACTIVITY_NAME = 'test'12ACTIVITY_VERSION = '1'13TASK_LIST = {'name': ACTIVITY_NAME + ACTIVITY_VERSION}14TASK_TOKEN = 'SOMETASKTOKEN'15INPUT = {16    'stuff': 'things'17}18TASK = {19    'taskToken': TASK_TOKEN,20    'input': json.dumps(INPUT)21}22RESULT = 'some result'23ERROR_MESSAGE = 'loud noises!'24class TestFulfillmentWorker(unittest.TestCase):25    def test_no_task(self):26        worker = FulfillmentWorker(27            description='This is a test worker',28            parameters={29                'stuff': StringParameter('some stuff')30            },31            result=StringResult('the result'),32            handler=Mock(),33            region=REGION,34            activity_name=ACTIVITY_NAME,35            activity_version=ACTIVITY_VERSION,36            swf_domain=SWF_DOMAIN37        )38        worker._swf.poll_for_activity_task = MagicMock(return_value={})39        task_token = worker.run()40        self.assertIsNone(task_token)41        worker._swf.poll_for_activity_task.assert_called_with(domain=SWF_DOMAIN, taskList=TASK_LIST)42        worker._handler.assert_not_called()43    def test_success(self):44        worker = FulfillmentWorker(45            description='This is a test worker',46            parameters={47                'stuff': StringParameter('some stuff')48            },49            result=StringResult('the result'),50            handler=Mock(return_value=RESULT),51            region=REGION,52            activity_name=ACTIVITY_NAME,53            activity_version=ACTIVITY_VERSION,54            swf_domain=SWF_DOMAIN55        )56        worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)57        worker._swf.respond_activity_task_completed = MagicMock()58        task_token = worker.run()59        expected = {60            'taskToken': TASK_TOKEN,61            'result': {62                'status': ActivityStatus.SUCCESS,63                'notes': [],64                'reason': None,65                'result': RESULT,66                'trace': []67            }68        }69        self.assertEqual(task_token, TASK_TOKEN)70        worker._swf.poll_for_activity_task.assert_called_once_with(domain=SWF_DOMAIN, taskList=TASK_LIST)71        worker._handler.assert_called_once_with(stuff=INPUT['stuff'])72        worker._swf.respond_activity_task_completed.assert_called_once()73        call_args = worker._swf.respond_activity_task_completed.call_args[1]74        call_args['result'] = json.loads(call_args['result'])75        self.assertEqual(call_args, expected)76        expected = {'activity': {'name': 'test', 'version': '1'},77                    'description': 'This is a test worker',78                    'params': {'description': '',79                               'properties': {'stuff': {'description': 'some stuff',80                                                        'type': 'string'}},81                               'required': ['stuff'],82                               'type': 'object'},83                    'result': {'description': 'the result', 'type': 'string'}}84        self.assertEqual(worker.handle(None, {'RETURN_SCHEMA': True}), expected)85    def test_fatal_error(self):86        error = FulfillmentFatalException(message=ERROR_MESSAGE)87        worker = FulfillmentWorker(88            description='This is a test worker',89            parameters={90                'stuff': StringParameter('some stuff')91            },92            result=StringResult('the result'),93            handler=Mock(side_effect=error),94            region=REGION,95            activity_name=ACTIVITY_NAME,96            activity_version=ACTIVITY_VERSION,97            swf_domain=SWF_DOMAIN98        )99        worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)100        worker._swf.respond_activity_task_failed = MagicMock()101        task_token = worker.run()102        expected = {103            'taskToken': TASK_TOKEN,104            'details': {105                'status': ActivityStatus.FATAL,106                'notes': [],107                'reason': ERROR_MESSAGE,108                'result': ERROR_MESSAGE,109                'trace': error.trace()110            },111            'reason': ERROR_MESSAGE112        }113        self.assertEqual(task_token, TASK_TOKEN)114        worker._swf.respond_activity_task_failed.assert_called_once()115        call_args = worker._swf.respond_activity_task_failed.call_args[1]116        call_args['details'] = json.loads(call_args['details'])117        self.assertEqual(call_args, expected)118    def test_default_error(self):119        error = Exception(ERROR_MESSAGE)120        worker = FulfillmentWorker(121            description='This is a test worker',122            parameters={123                'stuff': StringParameter('some stuff')124            },125            result=StringResult('the result'),126            handler=Mock(side_effect=error),127            region=REGION,128            activity_name=ACTIVITY_NAME,129            activity_version=ACTIVITY_VERSION,130            swf_domain=SWF_DOMAIN,131            default_exception=FulfillmentFatalException132        )133        worker._swf.poll_for_activity_task = MagicMock(return_value=TASK)134        worker._fail = MagicMock()135        task_token = worker.run()136        self.assertEqual(task_token, TASK_TOKEN)137        worker._fail.assert_called_once()138        call_args = worker._fail.call_args[0]139        self.assertEqual(call_args[0], TASK_TOKEN)140        self.assertIsInstance(call_args[1], FulfillmentFatalException)141        142    def test_validation_error(self):143        error = Exception(ERROR_MESSAGE)144        worker = FulfillmentWorker(145            description='This is a test worker',146            parameters={147                'stuff': StringParameter('some stuff')148            },149            result=StringResult('the result'),150            handler=Mock(side_effect=error),151            region=REGION,152            activity_name=ACTIVITY_NAME,153            activity_version=ACTIVITY_VERSION,154            swf_domain=SWF_DOMAIN155        )156        worker._swf.poll_for_activity_task = MagicMock(return_value={157            'taskToken': TASK_TOKEN,158            'input':  '{"stuff": 1}'159        })160        worker._swf.respond_activity_task_failed = MagicMock()161        task_token = worker.run()162        expected = {163            'taskToken': TASK_TOKEN,164            'details': json.dumps({165                "status": ActivityStatus.INVALID,166                "notes": [],167                "trace": [],168                "reason": None,169                "validation_errors": [170                    {171                        "cause": None,172                        "context": [],173                        "message": "1 is not of type 'string'",174                        "path": "stuff",175                        "relative_path": "stuff",176                        "absolute_path": "stuff",177                        "validator": "type",178                        "validator_value": "string"179                    }180                ]181            }),182            'reason': '1 validation error(s)'183        }184        self.assertEqual(task_token, TASK_TOKEN)185        call_args = worker._swf.respond_activity_task_failed.call_args[1]186        self.assertEqual(call_args, expected)187if __name__ == '__main__':...activity_task_test.py
Source:activity_task_test.py  
1# -*- coding: utf-8 -*-2from __future__ import absolute_import3from __future__ import unicode_literals4import mock5import pytest6from botocore.vendored.requests.exceptions import ReadTimeout7from py_swf.clients.activity_task import ActivityTask8from py_swf.clients.activity_task import ActivityTaskClient9from py_swf.errors import NoTaskFound10from testing.util import DictMock11@pytest.fixture12def activity_task_config():13    return mock.Mock()14@pytest.fixture15def activity_task_client(activity_task_config, boto_client):16    return ActivityTaskClient(17        activity_task_config,18        boto_client,19    )20class TestPolling:21    @pytest.fixture22    def boto_client_results(self):23        return DictMock()24    @pytest.fixture(autouse=True)25    def patch_poll_for_activity_task(self, boto_client_results, boto_client):26        boto_client.poll_for_activity_task.return_value = boto_client_results27    @pytest.fixture28    def expected_activity_task(self, boto_client_results):29        return ActivityTask(30            activity_id=boto_client_results['activityId'],31            type=boto_client_results['activityType']['name'],32            version=boto_client_results['activityType']['version'],33            input=boto_client_results['input'],34            task_token=boto_client_results['taskToken'],35            workflow_id=boto_client_results['workflowExecution']['workflowId'],36            workflow_run_id=boto_client_results['workflowExecution']['runId'],37        )38    def test_poll_with_identity(self, activity_task_config, expected_activity_task, activity_task_client, boto_client):39        result_activity_task = activity_task_client.poll(identity='meow')40        assert result_activity_task == expected_activity_task41        boto_client.poll_for_activity_task.assert_called_once_with(42            domain=activity_task_config.domain,43            identity=u'meow',44            taskList={45                u'name': activity_task_config.task_list,46            },47        )48    def test_poll_without_identity(self, activity_task_config, expected_activity_task, activity_task_client, boto_client):49        result_activity_task = activity_task_client.poll()50        assert result_activity_task == expected_activity_task51        boto_client.poll_for_activity_task.assert_called_once_with(52            domain=activity_task_config.domain,53            taskList={54                u'name': activity_task_config.task_list,55            },56        )57    def test_poll_timeout(self, activity_task_client, boto_client):58        boto_client.poll_for_activity_task.side_effect = ReadTimeout59        with pytest.raises(NoTaskFound):60            activity_task_client.poll()61class TestPollingWithBadResults:62    @pytest.fixture(autouse=True)63    def patch_poll_for_activity_task(self, boto_client_faulty_results, boto_client):64        boto_client.poll_for_activity_task.return_value = boto_client_faulty_results65    @pytest.fixture66    def boto_client_faulty_results(self):67        return {68            "startedEventId": 0,69            "previousStartedEventId": 0,70            "ResponseMetadata": {71                'HTTPStatusCode': 200,72                'RequestId': 'b1cde981-23a0-11e6-98d4-5316d1b97440'73            }74        }75    def test_receive_result_without_task_token(self, activity_task_client):76        with pytest.raises(NoTaskFound):77            activity_task_client.poll()78def test_finish(activity_task_client, boto_client):79    task_token = mock.Mock()80    result = mock.Mock()81    activity_task_client.finish(task_token, result)82    boto_client.respond_activity_task_completed.assert_called_once_with(83        result=result,84        taskToken=task_token,85    )86def test_fail_with_detail(activity_task_client, boto_client):87    task_token = mock.Mock()88    reason = "test_fail"89    detail = "detail"90    activity_task_client.fail(task_token, reason, detail)91    boto_client.respond_activity_task_failed.assert_called_once_with(92        details=detail,93        reason=reason,94        taskToken=task_token,95    )96def test_fail_without_detail(activity_task_client, boto_client):97    task_token = mock.Mock()98    reason = "test_fail"99    activity_task_client.fail(task_token, reason)100    boto_client.respond_activity_task_failed.assert_called_once_with(101        reason=reason,102        taskToken=task_token,...activity_task.py
Source:activity_task.py  
...85            reason=reason,86        )87        if details is not None:88            kwargs["details"] = details89        self.boto_client.respond_activity_task_failed(90            taskToken=task_token,91            **kwargs...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
