How to use get_activity_task method in localstack

Best Python code snippet using localstack_python

test_activity_worker.py

Source:test_activity_worker.py Github

copy

Full Screen

...244def test_activity_worker_listen_blocks_at_max_workers(245 activity_worker_kwargs246):247 """Assert ActivityWorker.listen() will block on worker availability.248 Once get_activity_task() returns a task object, AWS starts the timeout249 countdown for that task, but if the number of tasks already being worked250 one is equal to or greater than the worker_count then that task will be251 queued, which could cause it to timeout or heartbeat timeout before work252 has started on it.253 This tests that .listen() will block on workers becoming available before254 polling for new tasks.255 """256 def activity_fxn(*args, **kwargs):257 time.sleep(0.1)258 activity_worker_kwargs['activity_fxn'] = activity_fxn259 mock_client = activity_worker_kwargs["client"]260 output = [{"taskToken": "abc123", "input": "{}"}] * 3261 output.append(KeyboardInterrupt("This is a keyboard interrupt"))262 mock_client.get_activity_task.side_effect = output...

Full Screen

Full Screen

instance_task.py

Source:instance_task.py Github

copy

Full Screen

...60 config=Config(connect_timeout=65, read_timeout=65))61 try:62 while True:63 logger.info("get_activity_task")64 get_activity_task_response = client.get_activity_task(65 activityArn=activity_arn)66 retry_interval = 3067 if not __valid_token(get_activity_task_response):68 time.sleep(retry_interval)69 logger.info("no activity task, retrying")70 # If there is a null task token it means there is no task71 # available. Sleep the worker and try again72 while not __valid_token(get_activity_task_response):73 get_activity_task_response = client.get_activity_task(74 activityArn=activity_arn)75 time.sleep(retry_interval)76 logger.info("no activity task, retrying")77 task_token = get_activity_task_response["taskToken"]78 logger.info(f"got activity task, task_token {task_token}")79 task_input = json.loads(get_activity_task_response["input"])80 logger.info(dict(input=task_input))81 process_task(82 client=client,83 task_token=task_token,84 task_input=task_input["Input"],85 s3_bucket_name=s3_bucket_name,86 logger=logger,87 max_concurrency=max_concurrency)...

Full Screen

Full Screen

step_functions_client.py

Source:step_functions_client.py Github

copy

Full Screen

...43 connect_timeout=GET_ACTIVITY_TASK_SF_CLIENT_TIMEOUT,44 read_timeout=GET_ACTIVITY_TASK_SF_CLIENT_TIMEOUT)45 self._activity_task_client = ThrottledBotoResource(sf_client_with_longer_timeout)46 @error_handler47 def get_activity_task(self, activity_arn, worker_name=None):48 try:49 if worker_name is not None:50 activity_task = self._activity_task_client.get_activity_task(activityArn=activity_arn,51 workerName=worker_name)52 else:53 activity_task = self._activity_task_client.get_activity_task(activityArn=activity_arn)54 with suppress(KeyError, ValueError):55 activity_task['input'] = json.loads(activity_task['input'])56 return activity_task57 except StepFunctionsTaskTimeoutException:58 logger.exception(f'Get activity {activity_arn} timeout')59 raise60 def iterate_activity_tasks(self, activity_arn, worker_name=None, wait_sleep_time=2):61 while True:62 response = self.get_activity_task(activity_arn, worker_name)63 if 'taskToken' in response:64 logger.info(f'Received token {response["taskToken"][-10:]}')65 yield response66 else:67 sleep(wait_sleep_time)68 @error_handler69 def send_task_heartbeat(self, task_token):70 self._client.send_task_heartbeat(taskToken=task_token)71 def send_task_heartbeats_many(self, tasks_tokens):72 for token in tasks_tokens:73 self.send_task_heartbeat(task_token=token)74 @error_handler75 def send_task_success(self, task_token, output):76 logger.info(f'Sending successful response for task {task_token[-10:]}.')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful