Best Python code snippet using localstack_python
test_stepfunctions_cloudformation.py
Source:test_stepfunctions_cloudformation.py  
...39    }40    cf.create_stack(StackName="test_stack", TemplateBody=json.dumps(template))41    outputs_list = cf.Stack("test_stack").outputs42    output = {item["OutputKey"]: item["OutputValue"] for item in outputs_list}43    state_machine = sf.describe_state_machine(stateMachineArn=output["StateMachineArn"])44    state_machine["stateMachineArn"].should.equal(output["StateMachineArn"])45    state_machine["name"].should.equal(output["StateMachineName"])46    state_machine["roleArn"].should.equal(role_arn)47    state_machine["definition"].should.equal(definition)48    tags = sf.list_tags_for_resource(resourceArn=output["StateMachineArn"]).get("tags")49    for i, tag in enumerate(tags, 1):50        tag["key"].should.equal("key{}".format(i))51        tag["value"].should.equal("value{}".format(i))52    cf.Stack("test_stack").delete()53    with pytest.raises(ClientError) as ex:54        sf.describe_state_machine(stateMachineArn=output["StateMachineArn"])55    ex.value.response["Error"]["Code"].should.equal("StateMachineDoesNotExist")56    ex.value.response["Error"]["Message"].should.contain("Does Not Exist")57@mock_stepfunctions58@mock_cloudformation59def test_state_machine_cloudformation_update_with_replacement():60    sf = boto3.client("stepfunctions", region_name="us-east-1")61    cf = boto3.resource("cloudformation", region_name="us-east-1")62    definition = '{"StartAt": "HelloWorld", "States": {"HelloWorld": {"Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333;:function:HelloFunction", "End": true}}}'63    role_arn = (64        "arn:aws:iam::111122223333:role/service-role/StatesExecutionRole-us-east-1"65    )66    properties = {67        "StateMachineName": "HelloWorld-StateMachine",68        "DefinitionString": definition,69        "RoleArn": role_arn,70        "Tags": [71            {"Key": "key1", "Value": "value1"},72            {"Key": "key2", "Value": "value2"},73        ],74    }75    template = {76        "AWSTemplateFormatVersion": "2010-09-09",77        "Description": "An example template for a Step Functions state machine.",78        "Resources": {79            "MyStateMachine": {80                "Type": "AWS::StepFunctions::StateMachine",81                "Properties": {},82            }83        },84        "Outputs": {85            "StateMachineArn": {"Value": {"Ref": "MyStateMachine"}},86            "StateMachineName": {"Value": {"Fn::GetAtt": ["MyStateMachine", "Name"]}},87        },88    }89    template["Resources"]["MyStateMachine"]["Properties"] = properties90    cf.create_stack(StackName="test_stack", TemplateBody=json.dumps(template))91    outputs_list = cf.Stack("test_stack").outputs92    output = {item["OutputKey"]: item["OutputValue"] for item in outputs_list}93    state_machine = sf.describe_state_machine(stateMachineArn=output["StateMachineArn"])94    original_machine_arn = state_machine["stateMachineArn"]95    original_creation_date = state_machine["creationDate"]96    # Update State Machine, with replacement.97    updated_role = role_arn + "-updated"98    updated_definition = definition.replace("HelloWorld", "HelloWorld2")99    updated_properties = {100        "StateMachineName": "New-StateMachine-Name",101        "DefinitionString": updated_definition,102        "RoleArn": updated_role,103        "Tags": [104            {"Key": "key3", "Value": "value3"},105            {"Key": "key1", "Value": "updated_value"},106        ],107    }108    template["Resources"]["MyStateMachine"]["Properties"] = updated_properties109    cf.Stack("test_stack").update(TemplateBody=json.dumps(template))110    outputs_list = cf.Stack("test_stack").outputs111    output = {item["OutputKey"]: item["OutputValue"] for item in outputs_list}112    state_machine = sf.describe_state_machine(stateMachineArn=output["StateMachineArn"])113    state_machine["stateMachineArn"].should_not.equal(original_machine_arn)114    state_machine["name"].should.equal("New-StateMachine-Name")115    state_machine["creationDate"].should.be.greater_than(original_creation_date)116    state_machine["roleArn"].should.equal(updated_role)117    state_machine["definition"].should.equal(updated_definition)118    tags = sf.list_tags_for_resource(resourceArn=output["StateMachineArn"]).get("tags")119    tags.should.have.length_of(3)120    for tag in tags:121        if tag["key"] == "key1":122            tag["value"].should.equal("updated_value")123    with pytest.raises(ClientError) as ex:124        sf.describe_state_machine(stateMachineArn=original_machine_arn)125    ex.value.response["Error"]["Code"].should.equal("StateMachineDoesNotExist")126    ex.value.response["Error"]["Message"].should.contain("State Machine Does Not Exist")127@mock_stepfunctions128@mock_cloudformation129def test_state_machine_cloudformation_update_with_no_interruption():130    sf = boto3.client("stepfunctions", region_name="us-east-1")131    cf = boto3.resource("cloudformation", region_name="us-east-1")132    definition = '{"StartAt": "HelloWorld", "States": {"HelloWorld": {"Type": "Task", "Resource": "arn:aws:lambda:us-east-1:111122223333;:function:HelloFunction", "End": true}}}'133    role_arn = (134        "arn:aws:iam::111122223333:role/service-role/StatesExecutionRole-us-east-1"135    )136    properties = {137        "StateMachineName": "HelloWorld-StateMachine",138        "DefinitionString": definition,139        "RoleArn": role_arn,140        "Tags": [141            {"Key": "key1", "Value": "value1"},142            {"Key": "key2", "Value": "value2"},143        ],144    }145    template = {146        "AWSTemplateFormatVersion": "2010-09-09",147        "Description": "An example template for a Step Functions state machine.",148        "Resources": {149            "MyStateMachine": {150                "Type": "AWS::StepFunctions::StateMachine",151                "Properties": {},152            }153        },154        "Outputs": {155            "StateMachineArn": {"Value": {"Ref": "MyStateMachine"}},156            "StateMachineName": {"Value": {"Fn::GetAtt": ["MyStateMachine", "Name"]}},157        },158    }159    template["Resources"]["MyStateMachine"]["Properties"] = properties160    cf.create_stack(StackName="test_stack", TemplateBody=json.dumps(template))161    outputs_list = cf.Stack("test_stack").outputs162    output = {item["OutputKey"]: item["OutputValue"] for item in outputs_list}163    state_machine = sf.describe_state_machine(stateMachineArn=output["StateMachineArn"])164    machine_arn = state_machine["stateMachineArn"]165    creation_date = state_machine["creationDate"]166    # Update State Machine in-place, no replacement.167    updated_role = role_arn + "-updated"168    updated_definition = definition.replace("HelloWorld", "HelloWorldUpdated")169    updated_properties = {170        "DefinitionString": updated_definition,171        "RoleArn": updated_role,172        "Tags": [173            {"Key": "key3", "Value": "value3"},174            {"Key": "key1", "Value": "updated_value"},175        ],176    }177    template["Resources"]["MyStateMachine"]["Properties"] = updated_properties178    cf.Stack("test_stack").update(TemplateBody=json.dumps(template))179    state_machine = sf.describe_state_machine(stateMachineArn=machine_arn)180    state_machine["name"].should.equal("HelloWorld-StateMachine")181    state_machine["creationDate"].should.equal(creation_date)182    state_machine["roleArn"].should.equal(updated_role)183    state_machine["definition"].should.equal(updated_definition)184    tags = sf.list_tags_for_resource(resourceArn=machine_arn).get("tags")185    tags.should.have.length_of(3)186    for tag in tags:187        if tag["key"] == "key1":...responses.py
Source:responses.py  
...41        )42        response = {"stateMachines": list_all}43        return 200, {}, json.dumps(response)44    @amzn_request_id45    def describe_state_machine(self):46        arn = self._get_param("stateMachineArn")47        return self._describe_state_machine(arn)48    @amzn_request_id49    def _describe_state_machine(self, state_machine_arn):50        try:51            state_machine = self.stepfunction_backend.describe_state_machine(52                state_machine_arn53            )54            response = {55                "creationDate": state_machine.creation_date,56                "stateMachineArn": state_machine.arn,57                "definition": state_machine.definition,58                "name": state_machine.name,59                "roleArn": state_machine.roleArn,60                "status": "ACTIVE",61            }62            return 200, {}, json.dumps(response)63        except AWSError as err:64            return err.response()65    @amzn_request_id66    def delete_state_machine(self):67        arn = self._get_param("stateMachineArn")68        try:69            self.stepfunction_backend.delete_state_machine(arn)70            return 200, {}, json.dumps("{}")71        except AWSError as err:72            return err.response()73    @amzn_request_id74    def list_tags_for_resource(self):75        arn = self._get_param("resourceArn")76        try:77            state_machine = self.stepfunction_backend.describe_state_machine(arn)78            tags = state_machine.tags or []79        except AWSError:80            tags = []81        response = {"tags": tags}82        return 200, {}, json.dumps(response)83    @amzn_request_id84    def start_execution(self):85        arn = self._get_param("stateMachineArn")86        name = self._get_param("name")87        execution_input = self._get_param("input", if_none="{}")88        try:89            execution = self.stepfunction_backend.start_execution(90                arn, name, execution_input91            )92        except AWSError as err:93            return err.response()94        response = {95            "executionArn": execution.execution_arn,96            "startDate": execution.start_date,97        }98        return 200, {}, json.dumps(response)99    @amzn_request_id100    def list_executions(self):101        arn = self._get_param("stateMachineArn")102        state_machine = self.stepfunction_backend.describe_state_machine(arn)103        executions = self.stepfunction_backend.list_executions(arn)104        executions = [105            {106                "executionArn": execution.execution_arn,107                "name": execution.name,108                "startDate": execution.start_date,109                "stateMachineArn": state_machine.arn,110                "status": execution.status,111            }112            for execution in executions113        ]114        return 200, {}, json.dumps({"executions": executions})115    @amzn_request_id116    def describe_execution(self):117        arn = self._get_param("executionArn")118        try:119            execution = self.stepfunction_backend.describe_execution(arn)120            response = {121                "executionArn": arn,122                "input": execution.execution_input,123                "name": execution.name,124                "startDate": execution.start_date,125                "stateMachineArn": execution.state_machine_arn,126                "status": execution.status,127                "stopDate": execution.stop_date,128            }129            return 200, {}, json.dumps(response)130        except AWSError as err:131            return err.response()132    @amzn_request_id133    def describe_state_machine_for_execution(self):134        arn = self._get_param("executionArn")135        try:136            execution = self.stepfunction_backend.describe_execution(arn)137            return self._describe_state_machine(execution.state_machine_arn)138        except AWSError as err:139            return err.response()140    @amzn_request_id141    def stop_execution(self):142        arn = self._get_param("executionArn")143        execution = self.stepfunction_backend.stop_execution(arn)144        response = {"stopDate": execution.stop_date}...stepfunction.py
Source:stepfunction.py  
...8    aws = SessionHelper.remote_session(env.AwsAccountId)9    client = aws.client('stepfunctions', region_name=env.region)10    arn = f'arn:aws:states:{env.region}:{env.AwsAccountId}:stateMachine:{state_machine_name}'11    try:12        client.describe_state_machine(stateMachineArn=arn)13    except client.exceptions.StateMachineDoesNotExist:14        raise Exception(15            f'An error occurred (StateMachineNotFound) {arn} when calling the RUN PIPELINE operation'16        )17    response = client.start_execution(stateMachineArn=arn)18    print(response)19    return response['executionArn']20def list_executions(state_machine_name, env: models.Environment, stage='Test'):21    if not state_machine_name:22        raise Exception(23            'An error occurred (StackNotFound) when calling the RUN PIPELINE operation'24        )25    aws = SessionHelper.remote_session(env.AwsAccountId)26    client = aws.client('stepfunctions', region_name=env.region)27    arn = f'arn:aws:states:{env.region}:{env.AwsAccountId}:stateMachine:{state_machine_name}'28    try:29        client.describe_state_machine(stateMachineArn=arn)30    except client.exceptions.StateMachineDoesNotExist:31        print(32            f'An error occurred (StateMachineNotFound) {arn} when calling the RUN PIPELINE operation'33        )34        return []35    response = client.list_executions(stateMachineArn=arn, maxResults=100)36    executions = response.get('executions', [])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
