How to use list_state_machines method in localstack

Best Python code snippet using localstack_python

test_statemachine.py

Source:test_statemachine.py Github

copy

Full Screen

...32 }33 machine = StateMachine('name')34 self.assertEqual(machine.arn, 'XXX')35 calls = [36 mock.call.list_state_machines()37 ]38 self.assertEqual(client.mock_calls, calls)39 @mock.patch('heaviside.compile', autospec=True)40 @mock.patch('heaviside.create_session', autospec=True)41 def test_build(self, mCreateSession, mCompile):42 iSession = MockSession()43 iSession.region_name = 'region'44 mCreateSession.return_value = (iSession, '123456')45 client = iSession.client('stepfunctions')46 client.list_state_machines.return_value = {47 'stateMachines':[{48 'name': 'name',49 'stateMachineArn': 'XXX'50 }]51 }52 mCompile.return_value = {}53 machine = StateMachine('name')54 sfn = "Pass()"55 actual = machine.build(sfn)56 expected = {}57 self.assertEqual(actual, expected)58 59 calls = [60 mock.call(sfn, region=machine.region, account_id=machine.account_id, visitors=[])61 ]62 self.assertEqual(mCompile.mock_calls, calls)63 @mock.patch('heaviside.create_session', autospec=True)64 def test_resolve_role(self, mCreateSession):65 iSession = MockSession()66 mCreateSession.return_value = (iSession, '123456')67 client = iSession.client('stepfunctions')68 client.list_state_machines.return_value = {69 'stateMachines':[{70 'name': 'name',71 'stateMachineArn': 'XXX'72 }]73 }74 iam = iSession.client('iam')75 iam.get_role.return_value = {76 'Role': {77 'Arn': 'YYY'78 }79 }80 machine = StateMachine('name')81 arn = machine._resolve_role('role')82 self.assertEqual(arn, 'YYY')83 calls = [84 mock.call.get_role(RoleName = 'role')85 ]86 self.assertEqual(iam.mock_calls, calls)87 @mock.patch('heaviside.create_session', autospec=True)88 def test_create(self, mCreateSession):89 iSession = MockSession()90 mCreateSession.return_value = (iSession, '123456')91 client = iSession.client('stepfunctions')92 client.list_state_machines.return_value = {93 'stateMachines':[]94 }95 client.create_state_machine.return_value = {96 'stateMachineArn': 'XXX'97 }98 _resolve_role = mock.MagicMock()99 _resolve_role.return_value = 'arn'100 build = mock.MagicMock()101 build.return_value = {}102 machine = StateMachine('name')103 machine._resolve_role = _resolve_role104 machine.build = build105 machine.create('source', 'role')106 self.assertEqual(machine.arn, 'XXX')107 calls = [mock.call('role')]108 self.assertEqual(_resolve_role.mock_calls, calls)109 calls = [mock.call('source')]110 self.assertEqual(build.mock_calls, calls)111 calls = [112 mock.call.list_state_machines(),113 mock.call.create_state_machine(name = 'name',114 definition = {},115 roleArn = 'arn')116 ]117 self.assertEqual(client.mock_calls, calls)118 @mock.patch('heaviside.create_session', autospec=True)119 def test_create_exists(self, mCreateSession):120 iSession = MockSession()121 mCreateSession.return_value = (iSession, '123456')122 client = iSession.client('stepfunctions')123 client.list_state_machines.return_value = {124 'stateMachines':[{125 'name': 'name',126 'stateMachineArn': 'XXX'127 }]128 }129 machine = StateMachine('name')130 with self.assertRaises(Exception):131 machine.create('source', 'role')132 self.assertEqual(machine.arn, 'XXX')133 calls = [134 mock.call.list_state_machines(),135 ]136 self.assertEqual(client.mock_calls, calls)137 @mock.patch('heaviside.create_session', autospec=True)138 def test_delete(self, mCreateSession):139 iSession = MockSession()140 mCreateSession.return_value = (iSession, '123456')141 client = iSession.client('stepfunctions')142 client.list_state_machines.return_value = {143 'stateMachines':[{144 'name': 'name',145 'stateMachineArn': 'XXX'146 }]147 }148 machine = StateMachine('name')149 machine.delete()150 self.assertEqual(machine.arn, None)151 calls = [152 mock.call.list_state_machines(),153 mock.call.delete_state_machine(stateMachineArn = 'XXX')154 ]155 self.assertEqual(client.mock_calls, calls)156 @mock.patch('heaviside.create_session', autospec=True)157 def test_delete_exception(self, mCreateSession):158 iSession = MockSession()159 mCreateSession.return_value = (iSession, '123456')160 client = iSession.client('stepfunctions')161 client.list_state_machines.return_value = {162 'stateMachines':[]163 }164 machine = StateMachine('name')165 with self.assertRaises(Exception):166 machine.delete(True)167 self.assertEqual(machine.arn, None)168 calls = [169 mock.call.list_state_machines(),170 ]171 self.assertEqual(client.mock_calls, calls)172 @mock.patch('heaviside.create_session', autospec=True)173 def test_start(self, mCreateSession):174 iSession = MockSession()175 mCreateSession.return_value = (iSession, '123456')176 client = iSession.client('stepfunctions')177 client.list_state_machines.return_value = {178 'stateMachines':[{179 'name': 'name',180 'stateMachineArn': 'XXX'181 }]182 }183 client.start_execution.return_value = {184 'executionArn': 'YYY'185 }186 machine = StateMachine('name')187 arn = machine.start({}, 'run')188 self.assertEqual(arn, 'YYY')189 calls = [190 mock.call.list_state_machines(),191 mock.call.start_execution(stateMachineArn = 'XXX',192 name = 'run',193 input = '{}')194 ]195 self.assertEqual(client.mock_calls, calls)196 @mock.patch('heaviside.create_session', autospec=True)197 def test_start_doesnt_exist(self, mCreateSession):198 iSession = MockSession()199 mCreateSession.return_value = (iSession, '123456')200 client = iSession.client('stepfunctions')201 client.list_state_machines.return_value = {202 'stateMachines':[]203 }204 machine = StateMachine('name')205 with self.assertRaises(Exception):206 machine.start({}, 'run')207 calls = [208 mock.call.list_state_machines(),209 ]210 self.assertEqual(client.mock_calls, calls)211 @mock.patch('heaviside.create_session', autospec=True)212 def test_stop(self, mCreateSession):213 iSession = MockSession()214 mCreateSession.return_value = (iSession, '123456')215 client = iSession.client('stepfunctions')216 client.list_state_machines.return_value = {217 'stateMachines':[{218 'name': 'name',219 'stateMachineArn': 'XXX'220 }]221 }222 machine = StateMachine('name')223 machine.stop('arn', 'error', 'cause')224 calls = [225 mock.call.list_state_machines(),226 mock.call.stop_execution(executionArn = 'arn',227 error = 'error',228 cause = 'cause')229 ]230 self.assertEqual(client.mock_calls, calls)231 @mock.patch('heaviside.create_session', autospec=True)232 def test_status(self, mCreateSession):233 iSession = MockSession()234 mCreateSession.return_value = (iSession, '123456')235 client = iSession.client('stepfunctions')236 client.list_state_machines.return_value = {237 'stateMachines':[{238 'name': 'name',239 'stateMachineArn': 'XXX'240 }]241 }242 client.describe_execution.return_value = {243 'status': 'status'244 }245 machine = StateMachine('name')246 status = machine.status('arn')247 self.assertEqual(status, 'status')248 calls = [249 mock.call.list_state_machines(),250 mock.call.describe_execution(executionArn = 'arn')251 ]252 self.assertEqual(client.mock_calls, calls)253 @mock.patch('heaviside.time.sleep', autospec=True)254 @mock.patch('heaviside.create_session', autospec=True)255 def test_wait_success(self, mCreateSession, mSleep):256 iSession = MockSession()257 mCreateSession.return_value = (iSession, '123456')258 client = iSession.client('stepfunctions')259 client.list_state_machines.return_value = {260 'stateMachines':[{261 'name': 'name',262 'stateMachineArn': 'XXX'263 }]264 }265 client.describe_execution.side_effect = [266 {'status': 'RUNNING'},267 {'status': 'SUCCESS', 'output': '{}'}268 ]269 machine = StateMachine('name')270 output = machine.wait('arn')271 self.assertEqual(output, {})272 calls = [273 mock.call.list_state_machines(),274 mock.call.describe_execution(executionArn = 'arn'),275 mock.call.describe_execution(executionArn = 'arn')276 ]277 self.assertEqual(client.mock_calls, calls)278 calls = [279 mock.call(10)280 ]281 self.assertEqual(mSleep.mock_calls, calls)282 @mock.patch('heaviside.time.sleep', autospec=True)283 @mock.patch('heaviside.create_session', autospec=True)284 def _test_wait_xxx(self, error, mCreateSession, mSleep):285 iSession = MockSession()286 mCreateSession.return_value = (iSession, '123456')287 client = iSession.client('stepfunctions')288 client.list_state_machines.return_value = {289 'stateMachines':[{290 'name': 'name',291 'stateMachineArn': 'XXX'292 }]293 }294 client.describe_execution.side_effect = [295 {'status': 'RUNNING'},296 {'status': error}297 ]298 client.get_execution_history.return_value = {299 'events':[{300 'execution{}EventDetails'.format(error): {}301 }]302 }303 machine = StateMachine('name')304 if error is None:305 with self.assertRaises(Exception):306 machine.wait('arn')307 else:308 output = machine.wait('arn')309 self.assertEqual(output, {})310 calls = [311 mock.call.list_state_machines(),312 mock.call.describe_execution(executionArn = 'arn'),313 mock.call.describe_execution(executionArn = 'arn'),314 mock.call.get_execution_history(executionArn = 'arn',315 reverseOrder = True)316 ]317 self.assertEqual(client.mock_calls, calls)318 calls = [319 mock.call(10)320 ]321 self.assertEqual(mSleep.mock_calls, calls)322 def test_wait_failed(self):323 self._test_wait_xxx('Failed')324 def test_wait_aborted(self):325 self._test_wait_xxx('Aborted')326 def test_wait_timedout(self):327 self._test_wait_xxx('TimedOut')328 def test_wait_exception(self):329 self._test_wait_xxx(None)330 @mock.patch('heaviside.create_session', autospec=True)331 def test_running_arns(self, mCreateSession):332 iSession = MockSession()333 mCreateSession.return_value = (iSession, '123456')334 client = iSession.client('stepfunctions')335 client.list_state_machines.return_value = {336 'stateMachines':[{337 'name': 'name',338 'stateMachineArn': 'XXX'339 }]340 }341 client.list_executions.return_value = {342 'executions': [343 {'executionArn': 'arn1'},344 {'executionArn': 'arn2'}345 ]346 }347 machine = StateMachine('name')348 arns = machine.running_arns()349 self.assertEqual(arns, ['arn1', 'arn2'])350 calls = [351 mock.call.list_state_machines(),352 mock.call.list_executions(stateMachineArn = 'XXX',353 statusFilter = 'RUNNING')354 ]...

Full Screen

Full Screen

aws_stepfunctions_info.py

Source:aws_stepfunctions_info.py Github

copy

Full Screen

...119 if client.can_paginate('list_state_machines'):120 paginator = client.get_paginator('list_state_machines')121 return paginator.paginate(), True122 else:123 return client.list_state_machines(), False124 elif module.params['describe_state_machine']:125 if client.can_paginate('describe_state_machine'):126 paginator = client.get_paginator('describe_state_machine')127 return paginator.paginate(128 stateMachineArn=module.params['arn'],129 ), True130 else:131 return client.describe_state_machine(132 stateMachineArn=module.params['arn'],133 ), False134 else:135 return None, False136 except (BotoCoreError, ClientError) as e:137 module.fail_json_aws(e, msg='Failed to fetch AWS Step Functions (SFN) details')...

Full Screen

Full Screen

aws-stepfunctions-execute-state-machine.py

Source:aws-stepfunctions-execute-state-machine.py Github

copy

Full Screen

...25 args = setup_args()26 Util.check_debug_mode(args)27 # get arm for state machine28 step_functions = StepFunctions(args.profile)29 step_functions.list_state_machines()30 step_functions.organize_state_machines_by_name()31 state_machines_arns = step_functions.get_state_machines_arns(32 args.state_machines_names33 )34 print(state_machines_arns)35 logger.info("Starting state machines %s" % state_machines_arns)36 step_functions.execute_state_machines(state_machines_arns)37def setup_args() -> argparse.ArgumentParser:38 parser = argparse.ArgumentParser(description=None)39 parser.add_argument(40 "-e", "--environment", help="environment", choices=["dev", "stg"], default=""41 )42 parser.add_argument(43 "-s",44 "--state_machines_names",45 help="state machines to execute",46 type=str,47 nargs="*",48 required=True,49 )50 parser.add_argument(51 "-p", "--profile", choices=Aws.get_profiles(), default="default"52 )53 parser.add_argument("-d", "--debug", action="store_true")54 return parser.parse_args()55# def list_state_machines(client: boto3.client) -> List[dict]:56# logger.debug("Listing step functions...")57# state_machines = []58# resp = client.list_state_machines()59# state_machines.extend(resp["stateMachines"])60# next_token = resp.get("NextToken", None)61# while next_token:62# resp = client.list_state_machines(nextToken=next_token)63# state_machines.extend(resp["stateMachines"])64# next_token = resp.get("NextToken", None)65# logger.debug("Completed listing state machines!")66# return state_machines67def display_crawlers(state_machines: List[dict]):68 headers = list(state_machines[0].keys())69 table = [list(state_machine.values()) for state_machine in state_machines]70 print(tabulate(table, headers, tablefmt="simple"))71if __name__ == "__main__":72 LoggingConfigurator.configure_logging()73 logger.debug("Script Started")74 main()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful