How to use list_open_workflow_executions method in localstack

Best Python code snippet using localstack_python

swfmeta.py

Source:swfmeta.py Github

copy

Full Screen

...194 close_status,195 maximum_page_size,196 )197 # Still need to handle the nextPageToken198 infos = self.client.list_open_workflow_executions(**kwargs)199 # Check if there is no nextPageToken, if there is none200 # return the result, nothing to page201 next_page_token = infos.get("nextPageToken")202 # Continue, we have a nextPageToken. Assemble a full array of events by continually polling203 if next_page_token is not None:204 all_infos = infos["executionInfos"]205 while next_page_token is not None:206 kwargs["nextPageToken"] = next_page_token207 infos = self.client.list_open_workflow_executions(**kwargs)208 for execution_info in infos.get("executionInfos"):209 all_infos.append(execution_info)210 next_page_token = infos.get("nextPageToken")211 # Finally, reset the original decision response with the full set of events212 infos["executionInfos"] = all_infos213 return infos214 def is_workflow_open(215 self,216 domain=None,217 workflow_id=None,218 workflow_name=None,219 workflow_version=None,220 ):221 """...

Full Screen

Full Screen

classes_mock.py

Source:classes_mock.py Github

copy

Full Screen

...9 self.infos_counter = 010 def add_infos(self, infos):11 "add an infos, to allow it to return more than one infos in succession"12 self.infos.append(infos)13 def list_open_workflow_executions(14 self,15 domain=None,16 startTimeFilter=None,17 executionFilter=None,18 typeFilter=None,19 closeStatusFilter=None,20 maximumPageSize=None,21 nextPageToken=None,22 ):23 """24 return the infos for testing, when testing the next_page_token and25 mocking the return values the final infos value needs not have a26 nextPageToken otherwise it will loop forever in some swfmeta functions27 """28 if len(self.infos) > 1:29 return_infos = self.infos[self.infos_counter]30 if self.infos_counter < len(self.infos) - 1:31 self.infos_counter = self.infos_counter + 132 else:33 self.infos_counter = 034 return return_infos35 else:36 # reset the counter self.infos_counter then return37 self.infos_counter = 038 return self.infos[self.infos_counter]39 def list_closed_workflow_executions(40 self,41 domain=None,42 startTimeFilter=None,43 executionFilter=None,44 typeFilter=None,45 closeStatusFilter=None,46 maximumPageSize=None,47 nextPageToken=None,48 ):49 "for testing piggy-back list_open_workflow_executions to return infos"50 return self.list_open_workflow_executions()51 def count_closed_workflow_executions(52 self,53 domain=None,54 startTimeFilter=None,55 executionFilter=None,56 typeFilter=None,57 closeStatusFilter=None,58 maximumPageSize=None,59 nextPageToken=None,60 ):61 "for testing return a count of infos"62 count = 063 infos = self.list_open_workflow_executions()64 if infos and infos.get("executionInfos"):65 count = len(infos.get("executionInfos"))66 return {"count": count, "truncated": False}67 def describe_workflow_type(self, domain, workflowType):68 pass69 def register_workflow_type(self, **kwargs):70 pass71 def describe_activity_type(self, domain, activityType):72 "return a partial fake response for when testing"73 return {"typeInfo": "..."}74 def register_activity_type(self, **kwargs):75 pass76 def poll_for_decision_task(self, **kwargs):77 pass...

Full Screen

Full Screen

swf.py

Source:swf.py Github

copy

Full Screen

...18 tagList=None, 19 taskStartToCloseTimeout=None,20 **kwargs):21 swf = boto.swf.layer1.Layer1(awsKey, awsSecretKey)22 rawWorflows = swf.list_open_workflow_executions(domain, time.time()-30*24*3600, workflow_name=workflowName)23 for workflow in rawWorflows['executionInfos'] :24 if(workflow['execution']['workflowId'] == workflowId):25 log.info("Workflow {0} is already running".format(workflowId))26 return True27 if(input.startswith("\\")):28 input = input[1:]29 swf.start_workflow_execution(30 domain,31 workflowId,32 workflowName, 33 workflowVersion, 34 task_list=taskList, 35 child_policy=childPolicy, 36 execution_start_to_close_timeout=executionStartToCloseTimeout, ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful