Best Python code snippet using localstack_python
test_cloudformation_stepfunctions.py
Source:test_cloudformation_stepfunctions.py  
...57    state_machine_arn = deploy_result.outputs["statemachineOutput"]58    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[59        "executionArn"60    ]61    def _sfn_finished_running():62        return (63            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]64            != "RUNNING"65        )66    wait_until(_sfn_finished_running)67    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)68    assert execution_result["status"] == "SUCCEEDED"69    assert "hello from stepfunctions" in execution_result["output"]70def test_apigateway_invoke_with_path(cfn_client, deploy_cfn_template, stepfunctions_client):71    deploy_result = deploy_cfn_template(72        template_path=os.path.join(73            os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"74        )75    )76    state_machine_arn = deploy_result.outputs["statemachineOutput"]77    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[78        "executionArn"79    ]80    def _sfn_finished_running():81        return (82            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]83            != "RUNNING"84        )85    wait_until(_sfn_finished_running)86    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)87    assert execution_result["status"] == "SUCCEEDED"88    assert "hello_with_path from stepfunctions" in execution_result["output"]89def test_apigateway_invoke_localhost(cfn_client, deploy_cfn_template, stepfunctions_client):90    """tests the same as above but with the "generic" localhost version of invoking the apigateway"""91    deploy_result = deploy_cfn_template(92        template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_apigateway.yaml")93    )94    state_machine_arn = deploy_result.outputs["statemachineOutput"]95    api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]96    # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version97    state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[98        "definition"99    ]100    parsed = urllib.parse.urlparse(api_url)101    api_id = parsed.hostname.split(".")[0]102    state = json.loads(state_def)103    stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]104    state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"105    state["States"]["LsCallApi"]["Parameters"][106        "Stage"107    ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"108    state["States"]["LsCallApi"]["Parameters"]["Path"] = "/"109    stepfunctions_client.update_state_machine(110        stateMachineArn=state_machine_arn, definition=json.dumps(state)111    )112    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[113        "executionArn"114    ]115    def _sfn_finished_running():116        return (117            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]118            != "RUNNING"119        )120    wait_until(_sfn_finished_running)121    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)122    assert execution_result["status"] == "SUCCEEDED"123    assert "hello from stepfunctions" in execution_result["output"]124def test_apigateway_invoke_localhost_with_path(125    cfn_client, deploy_cfn_template, stepfunctions_client126):127    """tests the same as above but with the "generic" localhost version of invoking the apigateway"""128    deploy_result = deploy_cfn_template(129        template_path=os.path.join(130            os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"131        )132    )133    state_machine_arn = deploy_result.outputs["statemachineOutput"]134    api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]135    # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version136    state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[137        "definition"138    ]139    parsed = urllib.parse.urlparse(api_url)140    api_id = parsed.hostname.split(".")[0]141    state = json.loads(state_def)142    stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]143    state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"144    state["States"]["LsCallApi"]["Parameters"][145        "Stage"146    ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"147    stepfunctions_client.update_state_machine(148        stateMachineArn=state_machine_arn, definition=json.dumps(state)149    )150    execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[151        "executionArn"152    ]153    def _sfn_finished_running():154        return (155            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]156            != "RUNNING"157        )158    wait_until(_sfn_finished_running)159    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)160    assert execution_result["status"] == "SUCCEEDED"161    assert "hello_with_path from stepfunctions" in execution_result["output"]162def test_retry_and_catch(deploy_cfn_template, stepfunctions_client, sqs_client):163    """164    Scenario:165    Lambda invoke (incl. 3 retries)166        => catch (Send SQS message with body "Fail")167        => next (Send SQS message with body "Success")168    The Lambda function simply raises an Exception, so it will always fail.169    It should fail all 4 attempts (1x invoke + 3x retries) which should then trigger the catch path170    and send a "Fail" message to the queue.171    """172    stack = deploy_cfn_template(173        template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_retry_catch.yaml")174    )175    queue_url = stack.outputs["queueUrlOutput"]176    statemachine_arn = stack.outputs["smArnOutput"]177    assert statemachine_arn178    execution = stepfunctions_client.start_execution(stateMachineArn=statemachine_arn)179    execution_arn = execution["executionArn"]180    def _sfn_finished_running():181        return (182            stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]183            != "RUNNING"184        )185    assert wait_until(_sfn_finished_running)186    execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)187    assert execution_result["status"] == "SUCCEEDED"188    # fetch the message189    receive_result = sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=5)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
