Best Python code snippet using localstack_python
swfmeta.py
Source:swfmeta.py  
...194            close_status,195            maximum_page_size,196        )197        # Still need to handle the nextPageToken198        infos = self.client.list_open_workflow_executions(**kwargs)199        # Check if there is no nextPageToken, if there is none200        #  return the result, nothing to page201        next_page_token = infos.get("nextPageToken")202        # Continue, we have a nextPageToken. Assemble a full array of events by continually polling203        if next_page_token is not None:204            all_infos = infos["executionInfos"]205            while next_page_token is not None:206                kwargs["nextPageToken"] = next_page_token207                infos = self.client.list_open_workflow_executions(**kwargs)208                for execution_info in infos.get("executionInfos"):209                    all_infos.append(execution_info)210                next_page_token = infos.get("nextPageToken")211            # Finally, reset the original decision response with the full set of events212            infos["executionInfos"] = all_infos213        return infos214    def is_workflow_open(215        self,216        domain=None,217        workflow_id=None,218        workflow_name=None,219        workflow_version=None,220    ):221        """...classes_mock.py
Source:classes_mock.py  
...9        self.infos_counter = 010    def add_infos(self, infos):11        "add an infos, to allow it to return more than one infos in succession"12        self.infos.append(infos)13    def list_open_workflow_executions(14        self,15        domain=None,16        startTimeFilter=None,17        executionFilter=None,18        typeFilter=None,19        closeStatusFilter=None,20        maximumPageSize=None,21        nextPageToken=None,22    ):23        """24        return the infos for testing, when testing the next_page_token and25        mocking the return values the final infos value needs not have a26        nextPageToken otherwise it will loop forever in some swfmeta functions27        """28        if len(self.infos) > 1:29            return_infos = self.infos[self.infos_counter]30            if self.infos_counter < len(self.infos) - 1:31                self.infos_counter = self.infos_counter + 132            else:33                self.infos_counter = 034            return return_infos35        else:36            # reset the counter self.infos_counter then return37            self.infos_counter = 038            return self.infos[self.infos_counter]39    def list_closed_workflow_executions(40        self,41        domain=None,42        startTimeFilter=None,43        executionFilter=None,44        typeFilter=None,45        closeStatusFilter=None,46        maximumPageSize=None,47        nextPageToken=None,48    ):49        "for testing piggy-back list_open_workflow_executions to return infos"50        return self.list_open_workflow_executions()51    def count_closed_workflow_executions(52        self,53        domain=None,54        startTimeFilter=None,55        executionFilter=None,56        typeFilter=None,57        closeStatusFilter=None,58        maximumPageSize=None,59        nextPageToken=None,60    ):61        "for testing return a count of infos"62        count = 063        infos = self.list_open_workflow_executions()64        if infos and infos.get("executionInfos"):65            count = len(infos.get("executionInfos"))66        return {"count": count, "truncated": False}67    def describe_workflow_type(self, domain, workflowType):68        pass69    def register_workflow_type(self, **kwargs):70        pass71    def describe_activity_type(self, domain, activityType):72        "return a partial fake response for when testing"73        return {"typeInfo": "..."}74    def register_activity_type(self, **kwargs):75        pass76    def poll_for_decision_task(self, **kwargs):77        pass...swf.py
Source:swf.py  
...18		tagList=None, 19		taskStartToCloseTimeout=None,20		**kwargs):21	swf = boto.swf.layer1.Layer1(awsKey, awsSecretKey)22	rawWorflows = swf.list_open_workflow_executions(domain, time.time()-30*24*3600, workflow_name=workflowName)23	for workflow in rawWorflows['executionInfos'] :24		if(workflow['execution']['workflowId'] == workflowId):25			log.info("Workflow {0} is already running".format(workflowId))26			return True27	if(input.startswith("\\")):28		input = input[1:]29	swf.start_workflow_execution(30		domain,31		workflowId,32		workflowName, 33		workflowVersion, 34		task_list=taskList, 35		child_policy=childPolicy, 36		execution_start_to_close_timeout=executionStartToCloseTimeout, ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
