Best Python code snippet using localstack_python
test_cloudformation_stepfunctions.py
Source:test_cloudformation_stepfunctions.py  
...15    # execute statemachine16    ex_result = stepfunctions_client.start_execution(stateMachineArn=statemachine_arn)17    def _is_executed():18        return (19            stepfunctions_client.describe_execution(executionArn=ex_result["executionArn"])[20                "status"21            ]22            != "RUNNING"23        )24    wait_until(_is_executed)25    execution_desc = stepfunctions_client.describe_execution(executionArn=ex_result["executionArn"])26    assert execution_desc["status"] == "SUCCEEDED"27    # sync execution is currently not supported since botocore adds a "sync-" prefix28    # ex_result = stepfunctions_client.start_sync_execution(stateMachineArn=statemachine_arn)29    assert "hello from statemachine" in execution_desc["output"]30def test_nested_statemachine_with_sync2(31    lambda_client, stepfunctions_client, s3_client, deploy_cfn_template32):33    stack = deploy_cfn_template(34        template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_nested_sync2.json")35    )36    parent_arn = stack.outputs["ParentStateMachineArnOutput"]37    assert parent_arn38    ex_result = stepfunctions_client.start_execution(39        stateMachineArn=parent_arn, input='{"Value": 1}'40    )41    def _is_executed():42        return (43            stepfunctions_client.describe_execution(executionArn=ex_result["executionArn"])[44                "status"45            ]46            != "RUNNING"47        )48    wait_until(_is_executed)49    execution_desc = stepfunctions_client.describe_execution(executionArn=ex_result["executionArn"])50    assert execution_desc["status"] == "SUCCEEDED"51    output = json.loads(execution_desc["output"])52    assert output["Value"] == 353def test_apigateway_invoke(cfn_client, deploy_cfn_template, stepfunctions_client):54    deploy_result = deploy_cfn_template(55        template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_apigateway.yaml")56    )57    state_machine_arn = deploy_result.outputs["statemachineOutput"]58    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[59        "executionArn"60    ]61    def _sfn_finished_running():62        return (63            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]64            != "RUNNING"65        )66    wait_until(_sfn_finished_running)67    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)68    assert execution_result["status"] == "SUCCEEDED"69    assert "hello from stepfunctions" in execution_result["output"]70def test_apigateway_invoke_with_path(cfn_client, deploy_cfn_template, stepfunctions_client):71    deploy_result = deploy_cfn_template(72        template_path=os.path.join(73            os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"74        )75    )76    state_machine_arn = deploy_result.outputs["statemachineOutput"]77    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[78        "executionArn"79    ]80    def _sfn_finished_running():81        return (82            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]83            != "RUNNING"84        )85    wait_until(_sfn_finished_running)86    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)87    assert execution_result["status"] == "SUCCEEDED"88    assert "hello_with_path from stepfunctions" in execution_result["output"]89def test_apigateway_invoke_localhost(cfn_client, deploy_cfn_template, stepfunctions_client):90    """tests the same as above but with the "generic" localhost version of invoking the apigateway"""91    deploy_result = deploy_cfn_template(92        template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_apigateway.yaml")93    )94    state_machine_arn = deploy_result.outputs["statemachineOutput"]95    api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]96    # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version97    state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[98        "definition"99    ]100    parsed = urllib.parse.urlparse(api_url)101    api_id = parsed.hostname.split(".")[0]102    state = json.loads(state_def)103    stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]104    state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"105    state["States"]["LsCallApi"]["Parameters"][106        "Stage"107    ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"108    state["States"]["LsCallApi"]["Parameters"]["Path"] = "/"109    stepfunctions_client.update_state_machine(110        stateMachineArn=state_machine_arn, definition=json.dumps(state)111    )112    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[113        "executionArn"114    ]115    def _sfn_finished_running():116        return (117            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]118            != "RUNNING"119        )120    wait_until(_sfn_finished_running)121    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)122    assert execution_result["status"] == "SUCCEEDED"123    assert "hello from stepfunctions" in execution_result["output"]124def test_apigateway_invoke_localhost_with_path(125    cfn_client, deploy_cfn_template, stepfunctions_client126):127    """tests the same as above but with the "generic" localhost version of invoking the apigateway"""128    deploy_result = deploy_cfn_template(129        template_path=os.path.join(130            os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"131        )132    )133    state_machine_arn = deploy_result.outputs["statemachineOutput"]134    api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]135    # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version136    state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[137        "definition"138    ]139    parsed = urllib.parse.urlparse(api_url)140    api_id = parsed.hostname.split(".")[0]141    state = json.loads(state_def)142    stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]143    state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"144    state["States"]["LsCallApi"]["Parameters"][145        "Stage"146    ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"147    stepfunctions_client.update_state_machine(148        stateMachineArn=state_machine_arn, definition=json.dumps(state)149    )150    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[151        "executionArn"152    ]153    def _sfn_finished_running():154        return (155            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]156            != "RUNNING"157        )158    wait_until(_sfn_finished_running)159    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)160    assert execution_result["status"] == "SUCCEEDED"...test_step_functions.py
Source:test_step_functions.py  
1# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: MIT-03# pylint: skip-file4import os5import boto36from pytest import fixture, raises7from stubs import stub_step_functions8from mock import Mock9from stepfunctions import StepFunctions10@fixture11def cls():12    cls = StepFunctions(13        role=boto3,14        deployment_account_id='11111111111',15        deployment_account_region='eu-central-1',16        regions=['region-1', 'region-2'],17        account_ids='99999999999',18        full_path='banking/testing',19        update_pipelines_only=020    )21    cls.client = Mock()22    return cls23def test_statemachine_start(cls):24    cls.client.start_execution.return_value = stub_step_functions.start_execution25    cls._start_statemachine()26    assert cls.execution_arn == 'some_execution_arn'27def test_statemachine_get_status(cls):28    cls.client.describe_execution.return_value = stub_step_functions.describe_execution29    cls._start_statemachine()30    cls._fetch_statemachine_status()31    cls._execution_status == 'RUNNING'32def test_wait_failed_state_machine_execution(cls):33    stub_step_functions.describe_execution["status"] = "FAILED"34    cls.client.describe_execution.return_value = stub_step_functions.describe_execution35    cls._start_statemachine()36    cls._fetch_statemachine_status()37    assert cls._execution_status == 'FAILED'38    with raises(Exception):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
