How to use describe_workflow_execution method in localstack

Best Python code snippet using localstack_python

test.py

Source:test.py Github

copy

Full Screen

...168 if wf_result:169 run_id = wf_result['runId']170 while True:171 time.sleep(5)172 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,173 workflow_id='unit_test_a')174 if wf_result['executionInfo'].get('closeStatus') is not None:175 break176 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':177 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))178 else:179 self.fail('Error when launching workflow')180 def test_simple_workflow_in_class(self):181 # TODO Test output of workflow, not just the status182 wf_result = LiveSwfWorkflowTest.swf.start_workflow_execution(domain='example', workflow_id='unit_test_e',183 workflow_name='TestWorkflow',184 workflow_version='1.0',185 task_list='unit_test_e')186 if wf_result:187 run_id = wf_result['runId']188 while True:189 time.sleep(5)190 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,191 workflow_id='unit_test_e')192 if wf_result['executionInfo'].get('closeStatus') is not None:193 break194 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':195 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))196 else:197 self.fail('Error when launching workflow')198 def test_workflow_with_timer(self):199 # TODO Test output of workflow, not just the status200 wf_result = LiveSwfWorkflowTest.swf.start_workflow_execution(domain='example', workflow_id='unit_test_b',201 workflow_name='TestWorkflow',202 workflow_version='1.0',203 task_list='unit_test_b')204 if wf_result:205 run_id = wf_result['runId']206 while True:207 time.sleep(5)208 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,209 workflow_id='unit_test_b')210 if wf_result['executionInfo'].get('closeStatus') is not None:211 break212 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':213 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))214 else:215 self.fail('Error when launching workflow')216 def test_workflow_with_cwf(self):217 # TODO Test output of workflow, not just the status218 wf_result = LiveSwfWorkflowTest.swf.start_workflow_execution(domain='example', workflow_id='unit_test_c',219 workflow_name='TestWorkflow',220 workflow_version='1.0',221 task_list='unit_test_c')222 if wf_result:223 run_id = wf_result['runId']224 while True:225 time.sleep(5)226 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,227 workflow_id='unit_test_c')228 if wf_result['executionInfo'].get('closeStatus') is not None:229 break230 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':231 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))232 else:233 self.fail('Error when launching workflow')234 def test_workflow_with_cache(self):235 # TODO Test output of workflow, not just the status236 wf_result = LiveSwfWorkflowTest.swf.start_workflow_execution(domain='example', workflow_id='unit_test_d',237 workflow_name='TestWorkflow',238 workflow_version='1.0',239 task_list='unit_test_d')240 if wf_result:241 run_id = wf_result['runId']242 while True:243 time.sleep(5)244 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,245 workflow_id='unit_test_d')246 if wf_result['executionInfo'].get('closeStatus') is not None:247 break248 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':249 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))250 else:251 self.fail('Error when launching workflow')252 def test_workflow_with_user_markers(self):253 wf_result = LiveSwfWorkflowTest.swf.start_workflow_execution(domain='example', workflow_id='unit_test_f',254 workflow_name='TestWorkflow',255 workflow_version='1.0',256 task_list='unit_test_f')257 if wf_result:258 run_id = wf_result['runId']259 while True:260 time.sleep(5)261 wf_result = LiveSwfWorkflowTest.swf.describe_workflow_execution(domain='example', run_id=run_id,262 workflow_id='unit_test_f')263 if wf_result['executionInfo'].get('closeStatus') is not None:264 break265 if wf_result['executionInfo'].get('closeStatus') != 'COMPLETED':266 self.fail('Workflow {} failed'.format(wf_result['executionInfo']['execution']['runId']))267 else:268 self.fail('Error when launching workflow')269class LocalWorkflowTest(unittest.TestCase):270 def test_simple_workflow(self):271 decider = LocalDecisionWorker(decision_function=decider_a)272 result = decider.start()273 self.assertEquals(45, result)274 def test_workflow_with_timer(self):275 decider = LocalDecisionWorker(decision_function=decider_b)...

Full Screen

Full Screen

test_workflow.py

Source:test_workflow.py Github

copy

Full Screen

...211 def test_workflow_execution__diff_with_identical_workflow_execution(self):212 with patch.object(213 Layer1, "describe_workflow_execution", mock_describe_workflow_execution,214 ):215 mocked = mock_describe_workflow_execution()216 workflow_execution = WorkflowExecution(217 self.domain,218 mocked["executionInfo"]["execution"]["workflowId"],219 run_id=mocked["executionInfo"]["execution"]["runId"],220 status=mocked["executionInfo"]["executionStatus"],221 task_list=mocked["executionConfiguration"]["taskList"]["name"],222 child_policy=mocked["executionConfiguration"]["childPolicy"],223 execution_timeout=mocked["executionConfiguration"][224 "executionStartToCloseTimeout"225 ],226 tag_list=mocked["executionInfo"]["tagList"],227 decision_tasks_timeout=mocked["executionConfiguration"][228 "taskStartToCloseTimeout"229 ],230 )231 diffs = workflow_execution._diff()232 self.assertLength(diffs, 0)233 def test_exists_with_existing_workflow_execution(self):234 with patch.object(Layer1, "describe_workflow_execution"):235 self.assertTrue(self.we.exists)236 def test_exists_with_non_existent_workflow_execution(self):237 with patch.object(self.we.connection, "describe_workflow_execution") as mock:238 mock.side_effect = SWFResponseError(239 400,240 "Bad Request:",241 {242 "__type": "com.amazonaws.swf.base.model#UnknownResourceFault",243 "message": "Unknown execution: WorkflowExecution=[workflowId=blah, runId=test]",244 },245 "UnknownResourceFault",246 )247 self.assertFalse(self.we.exists)248 # TODO: fix test when no network (probably hits real SWF endpoints)249 @unittest.skip("Skip it in case there's no network connection.")250 def test_workflow_execution_exists_with_whatever_error(self):251 with patch.object(self.we.connection, "describe_workflow_execution") as mock:252 with self.assertRaises(ResponseError):253 mock.side_effect = SWFResponseError(254 400,255 "mocking exception",256 {"__type": "WhateverError", "message": "Whatever"},257 )258 dummy = self.domain.exists259 def test_is_synced_with_unsynced_workflow_execution(self):260 pass261 def test_is_synced_with_synced_workflow_execution(self):262 pass263 def test_is_synced_over_non_existent_workflow_execution(self):264 with patch.object(265 Layer1, "describe_workflow_execution", mock_describe_workflow_execution266 ):267 workflow_execution = WorkflowExecution(268 self.domain,269 WorkflowType(self.domain, "NonExistentTestType", "1.0"),270 "non-existent-id",271 )272 self.assertFalse(workflow_execution.is_synced)273 def test_changes_with_different_workflow_execution(self):274 with patch.object(275 Layer1, "describe_workflow_execution", mock_describe_workflow_execution,276 ):277 workflow_execution = WorkflowExecution(278 self.domain,279 WorkflowType(self.domain, "NonExistentTestType", "1.0"),280 "non-existent-id",281 )282 diffs = workflow_execution.changes283 self.assertIsNotNone(diffs)284 self.assertLength(diffs, 7)285 self.assertTrue(hasattr(diffs[0], "attr"))286 self.assertTrue(hasattr(diffs[0], "local"))287 self.assertTrue(hasattr(diffs[0], "upstream"))288 def test_workflow_execution_changes_with_identical_workflow_execution(self):289 with patch.object(290 Layer1, "describe_workflow_execution", mock_describe_workflow_execution,291 ):292 mocked = mock_describe_workflow_execution()293 workflow_execution = WorkflowExecution(294 self.domain,295 mocked["executionInfo"]["execution"]["workflowId"],296 run_id=mocked["executionInfo"]["execution"]["runId"],297 status=mocked["executionInfo"]["executionStatus"],298 task_list=mocked["executionConfiguration"]["taskList"]["name"],299 child_policy=mocked["executionConfiguration"]["childPolicy"],300 execution_timeout=mocked["executionConfiguration"][301 "executionStartToCloseTimeout"302 ],303 tag_list=mocked["executionInfo"]["tagList"],304 decision_tasks_timeout=mocked["executionConfiguration"][305 "taskStartToCloseTimeout"306 ],...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful