Best Python code snippet using localstack_python
test_statemachine.py
Source:test_statemachine.py  
...32        }33        machine = StateMachine('name')34        self.assertEqual(machine.arn, 'XXX')35        calls = [36            mock.call.list_state_machines()37        ]38        self.assertEqual(client.mock_calls, calls)39    @mock.patch('heaviside.compile', autospec=True)40    @mock.patch('heaviside.create_session', autospec=True)41    def test_build(self, mCreateSession, mCompile):42        iSession = MockSession()43        iSession.region_name = 'region'44        mCreateSession.return_value = (iSession, '123456')45        client = iSession.client('stepfunctions')46        client.list_state_machines.return_value = {47            'stateMachines':[{48                'name': 'name',49                'stateMachineArn': 'XXX'50            }]51        }52        mCompile.return_value = {}53        machine = StateMachine('name')54        sfn = "Pass()"55        actual = machine.build(sfn)56        expected = {}57        self.assertEqual(actual, expected)58        59        calls = [60            mock.call(sfn, region=machine.region, account_id=machine.account_id, visitors=[])61        ]62        self.assertEqual(mCompile.mock_calls, calls)63    @mock.patch('heaviside.create_session', autospec=True)64    def test_resolve_role(self, mCreateSession):65        iSession = MockSession()66        mCreateSession.return_value = (iSession, '123456')67        client = iSession.client('stepfunctions')68        client.list_state_machines.return_value = {69            'stateMachines':[{70                'name': 'name',71                'stateMachineArn': 'XXX'72            }]73        }74        iam = iSession.client('iam')75        iam.get_role.return_value = {76            'Role': {77                'Arn': 'YYY'78            }79        }80        machine = StateMachine('name')81        arn = machine._resolve_role('role')82        self.assertEqual(arn, 'YYY')83        calls = [84            mock.call.get_role(RoleName = 'role')85        ]86        self.assertEqual(iam.mock_calls, calls)87    @mock.patch('heaviside.create_session', autospec=True)88    def test_create(self, mCreateSession):89        iSession = MockSession()90        mCreateSession.return_value = (iSession, '123456')91        client = iSession.client('stepfunctions')92        client.list_state_machines.return_value = {93            'stateMachines':[]94        }95        client.create_state_machine.return_value = {96            'stateMachineArn': 'XXX'97        }98        _resolve_role = mock.MagicMock()99        _resolve_role.return_value = 'arn'100        build = mock.MagicMock()101        build.return_value = {}102        machine = StateMachine('name')103        machine._resolve_role = _resolve_role104        machine.build = build105        machine.create('source', 'role')106        self.assertEqual(machine.arn, 'XXX')107        calls = [mock.call('role')]108        self.assertEqual(_resolve_role.mock_calls, calls)109        calls = [mock.call('source')]110        self.assertEqual(build.mock_calls, calls)111        calls = [112            mock.call.list_state_machines(),113            mock.call.create_state_machine(name = 'name',114                                           definition = {},115                                           roleArn = 'arn')116        ]117        self.assertEqual(client.mock_calls, calls)118    @mock.patch('heaviside.create_session', autospec=True)119    def test_create_exists(self, mCreateSession):120        iSession = MockSession()121        mCreateSession.return_value = (iSession, '123456')122        client = iSession.client('stepfunctions')123        client.list_state_machines.return_value = {124            'stateMachines':[{125                'name': 'name',126                'stateMachineArn': 'XXX'127            }]128        }129        machine = StateMachine('name')130        with self.assertRaises(Exception):131            machine.create('source', 'role')132        self.assertEqual(machine.arn, 'XXX')133        calls = [134            mock.call.list_state_machines(),135        ]136        self.assertEqual(client.mock_calls, calls)137    @mock.patch('heaviside.create_session', autospec=True)138    def test_delete(self, mCreateSession):139        iSession = MockSession()140        mCreateSession.return_value = (iSession, '123456')141        client = iSession.client('stepfunctions')142        client.list_state_machines.return_value = {143            'stateMachines':[{144                'name': 'name',145                'stateMachineArn': 'XXX'146            }]147        }148        machine = StateMachine('name')149        machine.delete()150        self.assertEqual(machine.arn, None)151        calls = [152            mock.call.list_state_machines(),153            mock.call.delete_state_machine(stateMachineArn = 'XXX')154        ]155        self.assertEqual(client.mock_calls, calls)156    @mock.patch('heaviside.create_session', autospec=True)157    def test_delete_exception(self, mCreateSession):158        iSession = MockSession()159        mCreateSession.return_value = (iSession, '123456')160        client = iSession.client('stepfunctions')161        client.list_state_machines.return_value = {162            'stateMachines':[]163        }164        machine = StateMachine('name')165        with self.assertRaises(Exception):166            machine.delete(True)167        self.assertEqual(machine.arn, None)168        calls = [169            mock.call.list_state_machines(),170        ]171        self.assertEqual(client.mock_calls, calls)172    @mock.patch('heaviside.create_session', autospec=True)173    def test_start(self, mCreateSession):174        iSession = MockSession()175        mCreateSession.return_value = (iSession, '123456')176        client = iSession.client('stepfunctions')177        client.list_state_machines.return_value = {178            'stateMachines':[{179                'name': 'name',180                'stateMachineArn': 'XXX'181            }]182        }183        client.start_execution.return_value = {184            'executionArn': 'YYY'185        }186        machine = StateMachine('name')187        arn = machine.start({}, 'run')188        self.assertEqual(arn, 'YYY')189        calls = [190            mock.call.list_state_machines(),191            mock.call.start_execution(stateMachineArn = 'XXX',192                                      name = 'run',193                                      input = '{}')194        ]195        self.assertEqual(client.mock_calls, calls)196    @mock.patch('heaviside.create_session', autospec=True)197    def test_start_doesnt_exist(self, mCreateSession):198        iSession = MockSession()199        mCreateSession.return_value = (iSession, '123456')200        client = iSession.client('stepfunctions')201        client.list_state_machines.return_value = {202            'stateMachines':[]203        }204        machine = StateMachine('name')205        with self.assertRaises(Exception):206            machine.start({}, 'run')207        calls = [208            mock.call.list_state_machines(),209        ]210        self.assertEqual(client.mock_calls, calls)211    @mock.patch('heaviside.create_session', autospec=True)212    def test_stop(self, mCreateSession):213        iSession = MockSession()214        mCreateSession.return_value = (iSession, '123456')215        client = iSession.client('stepfunctions')216        client.list_state_machines.return_value = {217            'stateMachines':[{218                'name': 'name',219                'stateMachineArn': 'XXX'220            }]221        }222        machine = StateMachine('name')223        machine.stop('arn', 'error', 'cause')224        calls = [225            mock.call.list_state_machines(),226            mock.call.stop_execution(executionArn = 'arn',227                                     error = 'error',228                                     cause = 'cause')229        ]230        self.assertEqual(client.mock_calls, calls)231    @mock.patch('heaviside.create_session', autospec=True)232    def test_status(self, mCreateSession):233        iSession = MockSession()234        mCreateSession.return_value = (iSession, '123456')235        client = iSession.client('stepfunctions')236        client.list_state_machines.return_value = {237            'stateMachines':[{238                'name': 'name',239                'stateMachineArn': 'XXX'240            }]241        }242        client.describe_execution.return_value = {243            'status': 'status'244        }245        machine = StateMachine('name')246        status = machine.status('arn')247        self.assertEqual(status, 'status')248        calls = [249            mock.call.list_state_machines(),250            mock.call.describe_execution(executionArn = 'arn')251        ]252        self.assertEqual(client.mock_calls, calls)253    @mock.patch('heaviside.time.sleep', autospec=True)254    @mock.patch('heaviside.create_session', autospec=True)255    def test_wait_success(self, mCreateSession, mSleep):256        iSession = MockSession()257        mCreateSession.return_value = (iSession, '123456')258        client = iSession.client('stepfunctions')259        client.list_state_machines.return_value = {260            'stateMachines':[{261                'name': 'name',262                'stateMachineArn': 'XXX'263            }]264        }265        client.describe_execution.side_effect = [266            {'status': 'RUNNING'},267            {'status': 'SUCCESS', 'output': '{}'}268        ]269        machine = StateMachine('name')270        output = machine.wait('arn')271        self.assertEqual(output, {})272        calls = [273            mock.call.list_state_machines(),274            mock.call.describe_execution(executionArn = 'arn'),275            mock.call.describe_execution(executionArn = 'arn')276        ]277        self.assertEqual(client.mock_calls, calls)278        calls = [279            mock.call(10)280        ]281        self.assertEqual(mSleep.mock_calls, calls)282    @mock.patch('heaviside.time.sleep', autospec=True)283    @mock.patch('heaviside.create_session', autospec=True)284    def _test_wait_xxx(self, error, mCreateSession, mSleep):285        iSession = MockSession()286        mCreateSession.return_value = (iSession, '123456')287        client = iSession.client('stepfunctions')288        client.list_state_machines.return_value = {289            'stateMachines':[{290                'name': 'name',291                'stateMachineArn': 'XXX'292            }]293        }294        client.describe_execution.side_effect = [295            {'status': 'RUNNING'},296            {'status': error}297        ]298        client.get_execution_history.return_value = {299            'events':[{300                'execution{}EventDetails'.format(error): {}301            }]302        }303        machine = StateMachine('name')304        if error is None:305            with self.assertRaises(Exception):306                machine.wait('arn')307        else:308            output = machine.wait('arn')309            self.assertEqual(output, {})310        calls = [311            mock.call.list_state_machines(),312            mock.call.describe_execution(executionArn = 'arn'),313            mock.call.describe_execution(executionArn = 'arn'),314            mock.call.get_execution_history(executionArn = 'arn',315                                            reverseOrder = True)316        ]317        self.assertEqual(client.mock_calls, calls)318        calls = [319            mock.call(10)320        ]321        self.assertEqual(mSleep.mock_calls, calls)322    def test_wait_failed(self):323        self._test_wait_xxx('Failed')324    def test_wait_aborted(self):325        self._test_wait_xxx('Aborted')326    def test_wait_timedout(self):327        self._test_wait_xxx('TimedOut')328    def test_wait_exception(self):329        self._test_wait_xxx(None)330    @mock.patch('heaviside.create_session', autospec=True)331    def test_running_arns(self, mCreateSession):332        iSession = MockSession()333        mCreateSession.return_value = (iSession, '123456')334        client = iSession.client('stepfunctions')335        client.list_state_machines.return_value = {336            'stateMachines':[{337                'name': 'name',338                'stateMachineArn': 'XXX'339            }]340        }341        client.list_executions.return_value = {342            'executions': [343                {'executionArn': 'arn1'},344                {'executionArn': 'arn2'}345            ]346        }347        machine = StateMachine('name')348        arns = machine.running_arns()349        self.assertEqual(arns, ['arn1', 'arn2'])350        calls = [351            mock.call.list_state_machines(),352            mock.call.list_executions(stateMachineArn = 'XXX',353                                      statusFilter = 'RUNNING')354        ]...aws_stepfunctions_info.py
Source:aws_stepfunctions_info.py  
...119            if client.can_paginate('list_state_machines'):120                paginator = client.get_paginator('list_state_machines')121                return paginator.paginate(), True122            else:123                return client.list_state_machines(), False124        elif module.params['describe_state_machine']:125            if client.can_paginate('describe_state_machine'):126                paginator = client.get_paginator('describe_state_machine')127                return paginator.paginate(128                    stateMachineArn=module.params['arn'],129                ), True130            else:131                return client.describe_state_machine(132                    stateMachineArn=module.params['arn'],133                ), False134        else:135            return None, False136    except (BotoCoreError, ClientError) as e:137        module.fail_json_aws(e, msg='Failed to fetch AWS Step Functions (SFN) details')...aws-stepfunctions-execute-state-machine.py
Source:aws-stepfunctions-execute-state-machine.py  
...25    args = setup_args()26    Util.check_debug_mode(args)27    # get arm for state machine28    step_functions = StepFunctions(args.profile)29    step_functions.list_state_machines()30    step_functions.organize_state_machines_by_name()31    state_machines_arns = step_functions.get_state_machines_arns(32        args.state_machines_names33    )34    print(state_machines_arns)35    logger.info("Starting state machines %s" % state_machines_arns)36    step_functions.execute_state_machines(state_machines_arns)37def setup_args() -> argparse.ArgumentParser:38    parser = argparse.ArgumentParser(description=None)39    parser.add_argument(40        "-e", "--environment", help="environment", choices=["dev", "stg"], default=""41    )42    parser.add_argument(43        "-s",44        "--state_machines_names",45        help="state machines to execute",46        type=str,47        nargs="*",48        required=True,49    )50    parser.add_argument(51        "-p", "--profile", choices=Aws.get_profiles(), default="default"52    )53    parser.add_argument("-d", "--debug", action="store_true")54    return parser.parse_args()55# def list_state_machines(client: boto3.client) -> List[dict]:56#     logger.debug("Listing step functions...")57#     state_machines = []58#     resp = client.list_state_machines()59#     state_machines.extend(resp["stateMachines"])60#     next_token = resp.get("NextToken", None)61#     while next_token:62#         resp = client.list_state_machines(nextToken=next_token)63#         state_machines.extend(resp["stateMachines"])64#         next_token = resp.get("NextToken", None)65#     logger.debug("Completed listing state machines!")66#     return state_machines67def display_crawlers(state_machines: List[dict]):68    headers = list(state_machines[0].keys())69    table = [list(state_machine.values()) for state_machine in state_machines]70    print(tabulate(table, headers, tablefmt="simple"))71if __name__ == "__main__":72    LoggingConfigurator.configure_logging()73    logger.debug("Script Started")74    main()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
