How to use _sfn_finished_running method in localstack

Best Python code snippet using localstack_python

test_cloudformation_stepfunctions.py

Source:test_cloudformation_stepfunctions.py Github

copy

Full Screen

...57 state_machine_arn = deploy_result.outputs["statemachineOutput"]58 execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[59 "executionArn"60 ]61 def _sfn_finished_running():62 return (63 stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]64 != "RUNNING"65 )66 wait_until(_sfn_finished_running)67 execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)68 assert execution_result["status"] == "SUCCEEDED"69 assert "hello from stepfunctions" in execution_result["output"]70def test_apigateway_invoke_with_path(cfn_client, deploy_cfn_template, stepfunctions_client):71 deploy_result = deploy_cfn_template(72 template_path=os.path.join(73 os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"74 )75 )76 state_machine_arn = deploy_result.outputs["statemachineOutput"]77 execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[78 "executionArn"79 ]80 def _sfn_finished_running():81 return (82 stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]83 != "RUNNING"84 )85 wait_until(_sfn_finished_running)86 execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)87 assert execution_result["status"] == "SUCCEEDED"88 assert "hello_with_path from stepfunctions" in execution_result["output"]89def test_apigateway_invoke_localhost(cfn_client, deploy_cfn_template, stepfunctions_client):90 """tests the same as above but with the "generic" localhost version of invoking the apigateway"""91 deploy_result = deploy_cfn_template(92 template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_apigateway.yaml")93 )94 state_machine_arn = deploy_result.outputs["statemachineOutput"]95 api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]96 # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version97 state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[98 "definition"99 ]100 parsed = urllib.parse.urlparse(api_url)101 api_id = parsed.hostname.split(".")[0]102 state = json.loads(state_def)103 stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]104 state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"105 state["States"]["LsCallApi"]["Parameters"][106 "Stage"107 ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"108 state["States"]["LsCallApi"]["Parameters"]["Path"] = "/"109 stepfunctions_client.update_state_machine(110 stateMachineArn=state_machine_arn, definition=json.dumps(state)111 )112 execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[113 "executionArn"114 ]115 def _sfn_finished_running():116 return (117 stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]118 != "RUNNING"119 )120 wait_until(_sfn_finished_running)121 execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)122 assert execution_result["status"] == "SUCCEEDED"123 assert "hello from stepfunctions" in execution_result["output"]124def test_apigateway_invoke_localhost_with_path(125 cfn_client, deploy_cfn_template, stepfunctions_client126):127 """tests the same as above but with the "generic" localhost version of invoking the apigateway"""128 deploy_result = deploy_cfn_template(129 template_path=os.path.join(130 os.path.dirname(__file__), "../templates/sfn_apigateway_two_integrations.yaml"131 )132 )133 state_machine_arn = deploy_result.outputs["statemachineOutput"]134 api_url = deploy_result.outputs["LsApiEndpointA06D37E8"]135 # instead of changing the template, we're just mapping the endpoint here to the more generic path-based version136 state_def = stepfunctions_client.describe_state_machine(stateMachineArn=state_machine_arn)[137 "definition"138 ]139 parsed = urllib.parse.urlparse(api_url)140 api_id = parsed.hostname.split(".")[0]141 state = json.loads(state_def)142 stage = state["States"]["LsCallApi"]["Parameters"]["Stage"]143 state["States"]["LsCallApi"]["Parameters"]["ApiEndpoint"] = "localhost:4566"144 state["States"]["LsCallApi"]["Parameters"][145 "Stage"146 ] = f"restapis/{api_id}/{stage}/{PATH_USER_REQUEST}"147 stepfunctions_client.update_state_machine(148 stateMachineArn=state_machine_arn, definition=json.dumps(state)149 )150 execution_arn = stepfunctions_client.start_execution(stateMachineArn=state_machine_arn)[151 "executionArn"152 ]153 def _sfn_finished_running():154 return (155 stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]156 != "RUNNING"157 )158 wait_until(_sfn_finished_running)159 execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)160 assert execution_result["status"] == "SUCCEEDED"161 assert "hello_with_path from stepfunctions" in execution_result["output"]162def test_retry_and_catch(deploy_cfn_template, stepfunctions_client, sqs_client):163 """164 Scenario:165 Lambda invoke (incl. 3 retries)166 => catch (Send SQS message with body "Fail")167 => next (Send SQS message with body "Success")168 The Lambda function simply raises an Exception, so it will always fail.169 It should fail all 4 attempts (1x invoke + 3x retries) which should then trigger the catch path170 and send a "Fail" message to the queue.171 """172 stack = deploy_cfn_template(173 template_path=os.path.join(os.path.dirname(__file__), "../templates/sfn_retry_catch.yaml")174 )175 queue_url = stack.outputs["queueUrlOutput"]176 statemachine_arn = stack.outputs["smArnOutput"]177 assert statemachine_arn178 execution = stepfunctions_client.start_execution(stateMachineArn=statemachine_arn)179 execution_arn = execution["executionArn"]180 def _sfn_finished_running():181 return (182 stepfunctions_client.describe_execution(executionArn=execution_arn)["status"]183 != "RUNNING"184 )185 assert wait_until(_sfn_finished_running)186 execution_result = stepfunctions_client.describe_execution(executionArn=execution_arn)187 assert execution_result["status"] == "SUCCEEDED"188 # fetch the message189 receive_result = sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=5)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful