How to use start_workflow_execution method in localstack

Best Python code snippet using localstack_python

test_workflow_executions.py

Source:test_workflow_executions.py Github

copy

Full Screen

...20 conn.register_activity_type("test-domain", "test-activity", "v1.1")21 return conn22# StartWorkflowExecution endpoint23@mock_swf_deprecated24def test_start_workflow_execution():25 conn = setup_swf_environment()26 wf = conn.start_workflow_execution(27 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")28 wf.should.contain("runId")29@mock_swf_deprecated30def test_signal_workflow_execution():31 conn = setup_swf_environment()32 hsh = conn.start_workflow_execution(33 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")34 run_id = hsh["runId"]35 wfe = conn.signal_workflow_execution(36 "test-domain", "my_signal", "uid-abcd1234", "my_input", run_id)37 wfe = conn.describe_workflow_execution(38 "test-domain", run_id, "uid-abcd1234")39 wfe["openCounts"]["openDecisionTasks"].should.equal(2)40@mock_swf_deprecated41def test_start_already_started_workflow_execution():42 conn = setup_swf_environment()43 conn.start_workflow_execution(44 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")45 conn.start_workflow_execution.when.called_with(46 "test-domain", "uid-abcd1234", "test-workflow", "v1.0"47 ).should.throw(SWFResponseError)48@mock_swf_deprecated49def test_start_workflow_execution_on_deprecated_type():50 conn = setup_swf_environment()51 conn.deprecate_workflow_type("test-domain", "test-workflow", "v1.0")52 conn.start_workflow_execution.when.called_with(53 "test-domain", "uid-abcd1234", "test-workflow", "v1.0"54 ).should.throw(SWFResponseError)55# DescribeWorkflowExecution endpoint56@mock_swf_deprecated57def test_describe_workflow_execution():58 conn = setup_swf_environment()59 hsh = conn.start_workflow_execution(60 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")61 run_id = hsh["runId"]62 wfe = conn.describe_workflow_execution(63 "test-domain", run_id, "uid-abcd1234")64 wfe["executionInfo"]["execution"][65 "workflowId"].should.equal("uid-abcd1234")66 wfe["executionInfo"]["executionStatus"].should.equal("OPEN")67@mock_swf_deprecated68def test_describe_non_existent_workflow_execution():69 conn = setup_swf_environment()70 conn.describe_workflow_execution.when.called_with(71 "test-domain", "wrong-run-id", "wrong-workflow-id"72 ).should.throw(SWFResponseError)73# GetWorkflowExecutionHistory endpoint74@mock_swf_deprecated75def test_get_workflow_execution_history():76 conn = setup_swf_environment()77 hsh = conn.start_workflow_execution(78 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")79 run_id = hsh["runId"]80 resp = conn.get_workflow_execution_history(81 "test-domain", run_id, "uid-abcd1234")82 types = [evt["eventType"] for evt in resp["events"]]83 types.should.equal(["WorkflowExecutionStarted", "DecisionTaskScheduled"])84@mock_swf_deprecated85def test_get_workflow_execution_history_with_reverse_order():86 conn = setup_swf_environment()87 hsh = conn.start_workflow_execution(88 "test-domain", "uid-abcd1234", "test-workflow", "v1.0")89 run_id = hsh["runId"]90 resp = conn.get_workflow_execution_history("test-domain", run_id, "uid-abcd1234",91 reverse_order=True)92 types = [evt["eventType"] for evt in resp["events"]]93 types.should.equal(["DecisionTaskScheduled", "WorkflowExecutionStarted"])94@mock_swf_deprecated95def test_get_workflow_execution_history_on_non_existent_workflow_execution():96 conn = setup_swf_environment()97 conn.get_workflow_execution_history.when.called_with(98 "test-domain", "wrong-run-id", "wrong-workflow-id"99 ).should.throw(SWFResponseError)100# ListOpenWorkflowExecutions endpoint101@mock_swf_deprecated102def test_list_open_workflow_executions():103 conn = setup_swf_environment()104 # One open workflow execution105 conn.start_workflow_execution(106 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'107 )108 # One closed workflow execution to make sure it isn't displayed109 run_id = conn.start_workflow_execution(110 'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0'111 )['runId']112 conn.terminate_workflow_execution('test-domain', 'uid-abcd12345',113 details='some details',114 reason='a more complete reason',115 run_id=run_id)116 yesterday = datetime.utcnow() - timedelta(days=1)117 oldest_date = unix_time(yesterday)118 response = conn.list_open_workflow_executions('test-domain',119 oldest_date,120 workflow_id='test-workflow')121 execution_infos = response['executionInfos']122 len(execution_infos).should.equal(1)123 open_workflow = execution_infos[0]124 open_workflow['workflowType'].should.equal({'version': 'v1.0',125 'name': 'test-workflow'})126 open_workflow.should.contain('startTimestamp')127 open_workflow['execution']['workflowId'].should.equal('uid-abcd1234')128 open_workflow['execution'].should.contain('runId')129 open_workflow['cancelRequested'].should.be(False)130 open_workflow['executionStatus'].should.equal('OPEN')131# ListClosedWorkflowExecutions endpoint132@mock_swf_deprecated133def test_list_closed_workflow_executions():134 conn = setup_swf_environment()135 # Leave one workflow execution open to make sure it isn't displayed136 conn.start_workflow_execution(137 'test-domain', 'uid-abcd1234', 'test-workflow', 'v1.0'138 )139 # One closed workflow execution140 run_id = conn.start_workflow_execution(141 'test-domain', 'uid-abcd12345', 'test-workflow', 'v1.0'142 )['runId']143 conn.terminate_workflow_execution('test-domain', 'uid-abcd12345',144 details='some details',145 reason='a more complete reason',146 run_id=run_id)147 yesterday = datetime.utcnow() - timedelta(days=1)148 oldest_date = unix_time(yesterday)149 response = conn.list_closed_workflow_executions(150 'test-domain',151 start_oldest_date=oldest_date,152 workflow_id='test-workflow')153 execution_infos = response['executionInfos']154 len(execution_infos).should.equal(1)155 open_workflow = execution_infos[0]156 open_workflow['workflowType'].should.equal({'version': 'v1.0',157 'name': 'test-workflow'})158 open_workflow.should.contain('startTimestamp')159 open_workflow['execution']['workflowId'].should.equal('uid-abcd12345')160 open_workflow['execution'].should.contain('runId')161 open_workflow['cancelRequested'].should.be(False)162 open_workflow['executionStatus'].should.equal('CLOSED')163# TerminateWorkflowExecution endpoint164@mock_swf_deprecated165def test_terminate_workflow_execution():166 conn = setup_swf_environment()167 run_id = conn.start_workflow_execution(168 "test-domain", "uid-abcd1234", "test-workflow", "v1.0"169 )["runId"]170 resp = conn.terminate_workflow_execution("test-domain", "uid-abcd1234",171 details="some details",172 reason="a more complete reason",173 run_id=run_id)174 resp.should.be.none175 resp = conn.get_workflow_execution_history(176 "test-domain", run_id, "uid-abcd1234")177 evt = resp["events"][-1]178 evt["eventType"].should.equal("WorkflowExecutionTerminated")179 attrs = evt["workflowExecutionTerminatedEventAttributes"]180 attrs["details"].should.equal("some details")181 attrs["reason"].should.equal("a more complete reason")182 attrs["cause"].should.equal("OPERATOR_INITIATED")183@mock_swf_deprecated184def test_terminate_workflow_execution_with_wrong_workflow_or_run_id():185 conn = setup_swf_environment()186 run_id = conn.start_workflow_execution(187 "test-domain", "uid-abcd1234", "test-workflow", "v1.0"188 )["runId"]189 # terminate workflow execution190 conn.terminate_workflow_execution("test-domain", "uid-abcd1234")191 # already closed, with run_id192 conn.terminate_workflow_execution.when.called_with(193 "test-domain", "uid-abcd1234", run_id=run_id194 ).should.throw(195 SWFResponseError, "WorkflowExecution=[workflowId=uid-abcd1234, runId="196 )197 # already closed, without run_id198 conn.terminate_workflow_execution.when.called_with(199 "test-domain", "uid-abcd1234"200 ).should.throw(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful