Best Python code snippet using localstack_python
test_activity_worker.py
Source:test_activity_worker.py  
...244def test_activity_worker_listen_blocks_at_max_workers(245        activity_worker_kwargs246):247    """Assert ActivityWorker.listen() will block on worker availability.248    Once get_activity_task() returns a task object, AWS starts the timeout249    countdown for that task, but if the number of tasks already being worked250    one is equal to or greater than the worker_count then that task will be251    queued, which could cause it to timeout or heartbeat timeout before work252    has started on it.253    This tests that .listen() will block on workers becoming available before254    polling for new tasks.255    """256    def activity_fxn(*args, **kwargs):257        time.sleep(0.1)258    activity_worker_kwargs['activity_fxn'] = activity_fxn259    mock_client = activity_worker_kwargs["client"]260    output = [{"taskToken": "abc123", "input": "{}"}] * 3261    output.append(KeyboardInterrupt("This is a keyboard interrupt"))262    mock_client.get_activity_task.side_effect = output...instance_task.py
Source:instance_task.py  
...60        config=Config(connect_timeout=65, read_timeout=65))61    try:62        while True:63            logger.info("get_activity_task")64            get_activity_task_response = client.get_activity_task(65                activityArn=activity_arn)66            retry_interval = 3067            if not __valid_token(get_activity_task_response):68                time.sleep(retry_interval)69                logger.info("no activity task, retrying")70                # If there is a null task token it means there is no task71                # available. Sleep the worker and try again72                while not __valid_token(get_activity_task_response):73                    get_activity_task_response = client.get_activity_task(74                        activityArn=activity_arn)75                    time.sleep(retry_interval)76                    logger.info("no activity task, retrying")77            task_token = get_activity_task_response["taskToken"]78            logger.info(f"got activity task, task_token {task_token}")79            task_input = json.loads(get_activity_task_response["input"])80            logger.info(dict(input=task_input))81            process_task(82                client=client,83                task_token=task_token,84                task_input=task_input["Input"],85                s3_bucket_name=s3_bucket_name,86                logger=logger,87                max_concurrency=max_concurrency)...step_functions_client.py
Source:step_functions_client.py  
...43                                                           connect_timeout=GET_ACTIVITY_TASK_SF_CLIENT_TIMEOUT,44                                                           read_timeout=GET_ACTIVITY_TASK_SF_CLIENT_TIMEOUT)45        self._activity_task_client = ThrottledBotoResource(sf_client_with_longer_timeout)46    @error_handler47    def get_activity_task(self, activity_arn, worker_name=None):48        try:49            if worker_name is not None:50                activity_task = self._activity_task_client.get_activity_task(activityArn=activity_arn,51                                                                             workerName=worker_name)52            else:53                activity_task = self._activity_task_client.get_activity_task(activityArn=activity_arn)54            with suppress(KeyError, ValueError):55                activity_task['input'] = json.loads(activity_task['input'])56            return activity_task57        except StepFunctionsTaskTimeoutException:58            logger.exception(f'Get activity {activity_arn} timeout')59            raise60    def iterate_activity_tasks(self, activity_arn, worker_name=None, wait_sleep_time=2):61        while True:62            response = self.get_activity_task(activity_arn, worker_name)63            if 'taskToken' in response:64                logger.info(f'Received token {response["taskToken"][-10:]}')65                yield response66            else:67                sleep(wait_sleep_time)68    @error_handler69    def send_task_heartbeat(self, task_token):70        self._client.send_task_heartbeat(taskToken=task_token)71    def send_task_heartbeats_many(self, tasks_tokens):72        for token in tasks_tokens:73            self.send_task_heartbeat(task_token=token)74    @error_handler75    def send_task_success(self, task_token, output):76        logger.info(f'Sending successful response for task {task_token[-10:]}.')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
