How to use respond_activity_task_completed method in localstack

Best Python code snippet using localstack_python

test_activity_tasks.py

Source:test_activity_tasks.py Github

copy

Full Screen

...51 resp = conn.count_pending_activity_tasks("test-domain", "non-existent")52 resp.should.equal({"count": 0, "truncated": False})53# RespondActivityTaskCompleted endpoint54@mock_swf_deprecated55def test_respond_activity_task_completed():56 conn = setup_workflow()57 decision_token = conn.poll_for_decision_task(58 "test-domain", "queue")["taskToken"]59 conn.respond_decision_task_completed(decision_token, decisions=[60 SCHEDULE_ACTIVITY_TASK_DECISION61 ])62 activity_token = conn.poll_for_activity_task(63 "test-domain", "activity-task-list")["taskToken"]64 resp = conn.respond_activity_task_completed(65 activity_token, result="result of the task")66 resp.should.be.none67 resp = conn.get_workflow_execution_history(68 "test-domain", conn.run_id, "uid-abcd1234")69 resp["events"][-2]["eventType"].should.equal("ActivityTaskCompleted")70 resp["events"][-2]["activityTaskCompletedEventAttributes"].should.equal(71 {"result": "result of the task", "scheduledEventId": 5, "startedEventId": 6}72 )73@mock_swf_deprecated74def test_respond_activity_task_completed_on_closed_workflow_execution():75 conn = setup_workflow()76 decision_token = conn.poll_for_decision_task(77 "test-domain", "queue")["taskToken"]78 conn.respond_decision_task_completed(decision_token, decisions=[79 SCHEDULE_ACTIVITY_TASK_DECISION80 ])81 activity_token = conn.poll_for_activity_task(82 "test-domain", "activity-task-list")["taskToken"]83 # bad: we're closing workflow execution manually, but endpoints are not84 # coded for now..85 wfe = swf_backend.domains[0].workflow_executions[-1]86 wfe.execution_status = "CLOSED"87 # /bad88 conn.respond_activity_task_completed.when.called_with(89 activity_token90 ).should.throw(SWFResponseError, "WorkflowExecution=")91@mock_swf_deprecated92def test_respond_activity_task_completed_with_task_already_completed():93 conn = setup_workflow()94 decision_token = conn.poll_for_decision_task(95 "test-domain", "queue")["taskToken"]96 conn.respond_decision_task_completed(decision_token, decisions=[97 SCHEDULE_ACTIVITY_TASK_DECISION98 ])99 activity_token = conn.poll_for_activity_task(100 "test-domain", "activity-task-list")["taskToken"]101 conn.respond_activity_task_completed(activity_token)102 conn.respond_activity_task_completed.when.called_with(103 activity_token104 ).should.throw(SWFResponseError, "Unknown activity, scheduledEventId = 5")105# RespondActivityTaskFailed endpoint106@mock_swf_deprecated107def test_respond_activity_task_failed():108 conn = setup_workflow()109 decision_token = conn.poll_for_decision_task(110 "test-domain", "queue")["taskToken"]111 conn.respond_decision_task_completed(decision_token, decisions=[112 SCHEDULE_ACTIVITY_TASK_DECISION113 ])114 activity_token = conn.poll_for_activity_task(115 "test-domain", "activity-task-list")["taskToken"]...

Full Screen

Full Screen

Worker.py

Source:Worker.py Github

copy

Full Screen

...26 read_str = f.read()27 n = int(read_str)28 res = fibonacci(n)29 bucket.put_object(Key=key + "_out", Body=str(res))30 swf.respond_activity_task_completed(31 taskToken=task['taskToken'],32 result=str(res)33 )34 for object in bucket.objects.filter(Prefix="swf/" + key):35 if object.key == key:36 object.delete()37 print("Fibonacci job complete")38 elif task['activityType']['name'] == 'StoreInputS3':39 print("Got store input job")40 value = task['input']41 cur_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")42 bucket.put_object(Key="swf/" + cur_date, Body=value)43 swf.respond_activity_task_completed(44 taskToken=task['taskToken'],45 result=cur_date46 )47 elif task['activityType']['name'] == 'StoreOutputS3':48 print("Got store output job")49 value = task['input']50 cur_date = datetime.datetime.now().strftime("%Y%m%d%H%M%S")51 bucket.put_object(Key="swf_out/" + cur_date, Body=value)52 swf.respond_activity_task_completed(53 taskToken=task['taskToken'],54 result=cur_date...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful