Best Python code snippet using localstack_python
test_stepfunctions.py
Source:test_stepfunctions.py  
...192        self.assertTrue(result.get("executionArn"))193        # define expected output194        test_output = {**input, "added": {"Hello": TEST_RESULT_VALUE}}195        def check_result():196            result = self._get_execution_results(sm_arn)197            self.assertEqual(test_output, result)198        # assert that the result is correct199        retry(check_result, sleep=2, retries=10)200        # clean up201        self.cleanup(sm_arn, state_machines_before)202    def test_create_run_map_state_machine(self):203        names = ["Bob", "Meg", "Joe"]204        test_input = [{"map": name} for name in names]205        test_output = [{"Hello": name} for name in names]206        state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]207        role_arn = aws_stack.role_arn("sfn_role")208        definition = clone(STATE_MACHINE_MAP)209        lambda_arn_3 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_3)210        definition["States"]["ExampleMapState"]["Iterator"]["States"]["CallLambda"][211            "Resource"212        ] = lambda_arn_3213        definition = json.dumps(definition)214        sm_name = "map-%s" % short_uid()215        _ = self.sfn_client.create_state_machine(216            name=sm_name, definition=definition, roleArn=role_arn217        )218        # assert that the SM has been created219        self.assert_machine_created(state_machines_before)220        # run state machine221        sm_arn = self.get_machine_arn(sm_name)222        lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()223        result = self.sfn_client.start_execution(224            stateMachineArn=sm_arn, input=json.dumps(test_input)225        )226        self.assertTrue(result.get("executionArn"))227        def check_invocations():228            self.assertIn(lambda_arn_3, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)229            # assert that the result is correct230            result = self._get_execution_results(sm_arn)231            self.assertEqual(test_output, result)232        # assert that the lambda has been invoked by the SM execution233        retry(check_invocations, sleep=1, retries=10)234        # clean up235        self.cleanup(sm_arn, state_machines_before)236    def test_create_run_state_machine(self):237        state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]238        # create state machine239        role_arn = aws_stack.role_arn("sfn_role")240        definition = clone(STATE_MACHINE_BASIC)241        lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)242        lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)243        definition["States"]["step1"]["Resource"] = lambda_arn_1244        definition["States"]["step2"]["Resource"] = lambda_arn_2245        definition = json.dumps(definition)246        sm_name = "basic-%s" % short_uid()247        result = self.sfn_client.create_state_machine(248            name=sm_name, definition=definition, roleArn=role_arn249        )250        # assert that the SM has been created251        self.assert_machine_created(state_machines_before)252        # run state machine253        sm_arn = self.get_machine_arn(sm_name)254        lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()255        result = self.sfn_client.start_execution(stateMachineArn=sm_arn)256        self.assertTrue(result.get("executionArn"))257        def check_invocations():258            self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)259            self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)260            # assert that the result is correct261            result = self._get_execution_results(sm_arn)262            self.assertEqual({"Hello": TEST_RESULT_VALUE}, result["result_value"])263        # assert that the lambda has been invoked by the SM execution264        retry(check_invocations, sleep=0.7, retries=25)265        # clean up266        self.cleanup(sm_arn, state_machines_before)267    def test_try_catch_state_machine(self):268        state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]269        # create state machine270        role_arn = aws_stack.role_arn("sfn_role")271        definition = clone(STATE_MACHINE_CATCH)272        lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_1)273        lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_2)274        definition["States"]["Start"]["Parameters"]["FunctionName"] = lambda_arn_1275        definition["States"]["ErrorHandler"]["Resource"] = lambda_arn_2276        definition["States"]["Final"]["Resource"] = lambda_arn_2277        definition = json.dumps(definition)278        sm_name = "catch-%s" % short_uid()279        result = self.sfn_client.create_state_machine(280            name=sm_name, definition=definition, roleArn=role_arn281        )282        # run state machine283        sm_arn = self.get_machine_arn(sm_name)284        lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()285        result = self.sfn_client.start_execution(stateMachineArn=sm_arn)286        self.assertTrue(result.get("executionArn"))287        def check_invocations():288            self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)289            self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)290            # assert that the result is correct291            result = self._get_execution_results(sm_arn)292            self.assertEqual({"Hello": TEST_RESULT_VALUE}, result.get("handled"))293        # assert that the lambda has been invoked by the SM execution294        retry(check_invocations, sleep=1, retries=10)295        # clean up296        self.cleanup(sm_arn, state_machines_before)297    def test_intrinsic_functions(self):298        state_machines_before = self.sfn_client.list_state_machines()["stateMachines"]299        # create state machine300        role_arn = aws_stack.role_arn("sfn_role")301        definition = clone(STATE_MACHINE_INTRINSIC_FUNCS)302        lambda_arn_1 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)303        lambda_arn_2 = aws_stack.lambda_function_arn(TEST_LAMBDA_NAME_5)304        if isinstance(definition["States"]["state1"].get("Parameters"), dict):305            definition["States"]["state1"]["Parameters"]["lambda_params"][306                "FunctionName"307            ] = lambda_arn_1308            definition["States"]["state3"]["Resource"] = lambda_arn_2309        definition = json.dumps(definition)310        sm_name = "intrinsic-%s" % short_uid()311        result = self.sfn_client.create_state_machine(312            name=sm_name, definition=definition, roleArn=role_arn313        )314        # run state machine315        sm_arn = self.get_machine_arn(sm_name)316        lambda_api.LAMBDA_EXECUTOR.function_invoke_times.clear()317        input = {}318        result = self.sfn_client.start_execution(stateMachineArn=sm_arn, input=json.dumps(input))319        self.assertTrue(result.get("executionArn"))320        def check_invocations():321            self.assertIn(lambda_arn_1, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)322            self.assertIn(lambda_arn_2, lambda_api.LAMBDA_EXECUTOR.function_invoke_times)323            # assert that the result is correct324            result = self._get_execution_results(sm_arn)325            self.assertEqual({"payload": {"values": [1, "v2"]}}, result.get("result_value"))326        # assert that the lambda has been invoked by the SM execution327        retry(check_invocations, sleep=1, retries=10)328        # clean up329        self.cleanup(sm_arn, state_machines_before)330    def get_machine_arn(self, sm_name):331        state_machines = self.sfn_client.list_state_machines()["stateMachines"]332        return [m["stateMachineArn"] for m in state_machines if m["name"] == sm_name][0]333    def assert_machine_created(self, state_machines_before):334        return self._assert_machine_instances(len(state_machines_before) + 1)335    def assert_machine_deleted(self, state_machines_before):336        return self._assert_machine_instances(len(state_machines_before))337    def cleanup(self, sm_arn, state_machines_before):338        self.sfn_client.delete_state_machine(stateMachineArn=sm_arn)339        self.assert_machine_deleted(state_machines_before)340    def _assert_machine_instances(self, expected_instances):341        def check():342            state_machines_after = self.sfn_client.list_state_machines()["stateMachines"]343            self.assertEqual(expected_instances, len(state_machines_after))344            return state_machines_after345        return retry(check, sleep=1, retries=4)346    def _get_execution_results(self, sm_arn):347        response = self.sfn_client.list_executions(stateMachineArn=sm_arn)348        executions = sorted(response["executions"], key=lambda x: x["startDate"])349        execution = executions[-1]350        result = self.sfn_client.get_execution_history(executionArn=execution["executionArn"])351        events = sorted(result["events"], key=lambda event: event["timestamp"])352        result = json.loads(events[-1]["executionSucceededEventDetails"]["output"])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
