How to use subtask method in locust

Best Python code snippet using locust

subtasks.py

Source:subtasks.py Github

copy

Full Screen

...53 metric + "." + memory_type,54 memory_used,55 tags=["course_id:{}".format(course_id)],56 )57def _generate_items_for_subtask(58 item_querysets, # pylint: disable=bad-continuation59 item_fields,60 total_num_items,61 items_per_task,62 total_num_subtasks,63 course_id,64):65 """66 Generates a chunk of "items" that should be passed into a subtask.67 Arguments:68 `item_querysets` : a list of query sets, each of which defines the "items" that should be passed to subtasks.69 `item_fields` : the fields that should be included in the dict that is returned.70 These are in addition to the 'pk' field.71 `total_num_items` : the result of summing the count of each queryset in `item_querysets`.72 `items_per_query` : size of chunks to break the query operation into.73 `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.74 `course_id` : course_id of the course. Only needed for the track_memory_usage context manager.75 Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.76 Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.77 """78 num_items_queued = 079 all_item_fields = list(item_fields)80 all_item_fields.append('pk')81 num_subtasks = 082 items_for_task = []83 with track_memory_usage('course_email.subtask_generation.memory', course_id):84 for queryset in item_querysets:85 for item in queryset.values(*all_item_fields).iterator():86 if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:87 yield items_for_task88 num_items_queued += items_per_task89 items_for_task = []90 num_subtasks += 191 items_for_task.append(item)92 # yield remainder items for task, if any93 if items_for_task:94 yield items_for_task95 num_items_queued += len(items_for_task)96 # Note, depending on what kind of DB is used, it's possible for the queryset97 # we iterate over to change in the course of the query. Therefore it's98 # possible that there are more (or fewer) items queued than were initially99 # calculated. It also means it's possible that the last task contains100 # more items than items_per_task allows. We expect this to be a small enough101 # number as to be negligible.102 if num_items_queued != total_num_items:103 TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)104class SubtaskStatus(object):105 """106 Create and return a dict for tracking the status of a subtask.107 SubtaskStatus values are:108 'task_id' : id of subtask. This is used to pass task information across retries.109 'attempted' : number of attempts -- should equal succeeded plus failed110 'succeeded' : number that succeeded in processing111 'skipped' : number that were not processed.112 'failed' : number that failed during processing113 'retried_nomax' : number of times the subtask has been retried for conditions that114 should not have a maximum count applied115 'retried_withmax' : number of times the subtask has been retried for conditions that116 should have a maximum count applied117 'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)118 Object is not JSON-serializable, so to_dict and from_dict methods are provided so that119 it can be passed as a serializable argument to tasks (and be reconstituted within such tasks).120 In future, we may want to include specific error information121 indicating the reason for failure.122 Also, we should count up "not attempted" separately from attempted/failed.123 """124 def __init__(self, task_id, attempted=None, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):125 """Construct a SubtaskStatus object."""126 self.task_id = task_id127 if attempted is not None:128 self.attempted = attempted129 else:130 self.attempted = succeeded + failed131 self.succeeded = succeeded132 self.failed = failed133 self.skipped = skipped134 self.retried_nomax = retried_nomax135 self.retried_withmax = retried_withmax136 self.state = state if state is not None else QUEUING137 @classmethod138 def from_dict(cls, d):139 """Construct a SubtaskStatus object from a dict representation."""140 options = dict(d)141 task_id = options['task_id']142 del options['task_id']143 return SubtaskStatus.create(task_id, **options)144 @classmethod145 def create(cls, task_id, **options):146 """Construct a SubtaskStatus object."""147 return cls(task_id, **options)148 def to_dict(self):149 """150 Output a dict representation of a SubtaskStatus object.151 Use for creating a JSON-serializable representation for use by tasks.152 """153 return self.__dict__154 def increment(self, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):155 """156 Update the result of a subtask with additional results.157 Kwarg arguments are incremented to the existing values.158 The exception is for `state`, which if specified is used to override the existing value.159 """160 self.attempted += (succeeded + failed)161 self.succeeded += succeeded162 self.failed += failed163 self.skipped += skipped164 self.retried_nomax += retried_nomax165 self.retried_withmax += retried_withmax166 if state is not None:167 self.state = state168 def get_retry_count(self):169 """Returns the number of retries of any kind."""170 return self.retried_nomax + self.retried_withmax171 def __repr__(self):172 """Return print representation of a SubtaskStatus object."""173 return 'SubtaskStatus<%r>' % (self.to_dict(),)174 def __unicode__(self):175 """Return unicode version of a SubtaskStatus object representation."""176 return unicode(repr(self))177def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):178 """179 Store initial subtask information to InstructorTask object.180 The InstructorTask's "task_output" field is initialized. This is a JSON-serialized dict.181 Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,182 as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,183 and the total number of "things to do" is set, so the user can be told how much needs to be184 done overall. The `action_name` is also stored, to help with constructing more readable185 task_progress messages.186 The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.187 Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of188 subtasks. 'Total' is set here to the total number, while the other three are initialized to zero.189 Once the counters for 'succeeded' and 'failed' match the 'total', the subtasks are done and190 the InstructorTask's "status" will be changed to SUCCESS.191 The "subtasks" field also contains a 'status' key, that contains a dict that stores status192 information for each subtask. The value for each subtask (keyed by its task_id)193 is its subtask status, as defined by SubtaskStatus.to_dict().194 This information needs to be set up in the InstructorTask before any of the subtasks start195 running. If not, there is a chance that the subtasks could complete before the parent task196 is done creating subtasks. Doing so also simplifies the save() here, as it avoids the need197 for locking.198 Monitoring code should assume that if an InstructorTask has subtask information, that it should199 rely on the status stored in the InstructorTask object, rather than status stored in the200 corresponding AsyncResult.201 """202 task_progress = {203 'action_name': action_name,204 'attempted': 0,205 'failed': 0,206 'skipped': 0,207 'succeeded': 0,208 'total': total_num,209 'duration_ms': int(0),210 'start_time': time()211 }212 entry.task_output = InstructorTask.create_output_for_success(task_progress)213 entry.task_state = PROGRESS214 # Write out the subtasks information.215 num_subtasks = len(subtask_id_list)216 # Note that may not be necessary to store initial value with all those zeroes!217 # Write out as a dict, so it will go more smoothly into json.218 subtask_status = {subtask_id: (SubtaskStatus.create(subtask_id)).to_dict() for subtask_id in subtask_id_list}219 subtask_dict = {220 'total': num_subtasks,221 'succeeded': 0,222 'failed': 0,223 'status': subtask_status224 }225 entry.subtasks = json.dumps(subtask_dict)226 # and save the entry immediately, before any subtasks actually start work:227 entry.save_now()228 return task_progress229# pylint: disable=bad-continuation230def queue_subtasks_for_query(231 entry,232 action_name,233 create_subtask_fcn,234 item_querysets,235 item_fields,236 items_per_task,237 total_num_items,238):239 """240 Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.241 Arguments:242 `entry` : the InstructorTask object for which subtasks are being queued.243 `action_name` : a past-tense verb that can be used for constructing readable status messages.244 `create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.245 Arguments are the list of items to be processed by this subtask, and a SubtaskStatus246 object reflecting initial status (and containing the subtask's id).247 `item_querysets` : a list of query sets that define the "items" that should be passed to subtasks.248 `item_fields` : the fields that should be included in the dict that is returned.249 These are in addition to the 'pk' field.250 `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.251 `total_num_items` : total amount of items that will be put into subtasks252 Returns: the task progress as stored in the InstructorTask object.253 """254 task_id = entry.task_id255 # Calculate the number of tasks that will be created, and create a list of ids for each task.256 total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)257 subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]258 # Update the InstructorTask with information about the subtasks we've defined.259 TASK_LOG.info(260 "Task %s: updating InstructorTask %s with subtask info for %s subtasks to process %s items.",261 task_id,262 entry.id,263 total_num_subtasks,264 total_num_items,265 ) # pylint: disable=no-member266 progress = initialize_subtask_info(entry, action_name, total_num_items, subtask_id_list)267 # Construct a generator that will return the recipients to use for each subtask.268 # Pass in the desired fields to fetch for each recipient.269 item_list_generator = _generate_items_for_subtask(270 item_querysets,271 item_fields,272 total_num_items,273 items_per_task,274 total_num_subtasks,275 entry.course_id,276 )277 # Now create the subtasks, and start them running.278 TASK_LOG.info(279 "Task %s: creating %s subtasks to process %s items.",280 task_id,281 total_num_subtasks,282 total_num_items,283 )...

Full Screen

Full Screen

test_err_handling.py

Source:test_err_handling.py Github

copy

Full Screen

...230 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)231 task_input = {"email_id": email.id}232 with self.assertRaisesRegex(ValueError, 'does not match email value'):233 perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name")234 def test_send_email_undefined_subtask(self):235 # test at a lower level, to ensure that the course gets checked down below too.236 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)237 entry_id = entry.id238 to_list = ['test@test.com']239 global_email_context = {'course_title': 'dummy course'}240 subtask_id = "subtask-id-value"241 subtask_status = SubtaskStatus.create(subtask_id)242 email_id = 1001243 with self.assertRaisesRegex(DuplicateTaskException, 'unable to find subtasks of instructor task'):244 send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status.to_dict())245 def test_send_email_missing_subtask(self):246 # test at a lower level, to ensure that the course gets checked down below too.247 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)248 entry_id = entry.id249 to_list = ['test@test.com']250 global_email_context = {'course_title': 'dummy course'}251 subtask_id = "subtask-id-value"252 initialize_subtask_info(entry, "emailed", 100, [subtask_id])253 different_subtask_id = "bogus-subtask-id-value"254 subtask_status = SubtaskStatus.create(different_subtask_id)255 bogus_email_id = 1001256 with self.assertRaisesRegex(DuplicateTaskException, 'unable to find status for subtask of instructor task'):257 send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())258 def test_send_email_completed_subtask(self):259 # test at a lower level, to ensure that the course gets checked down below too.260 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)261 entry_id = entry.id262 subtask_id = "subtask-id-value"263 initialize_subtask_info(entry, "emailed", 100, [subtask_id])264 subtask_status = SubtaskStatus.create(subtask_id, state=SUCCESS)265 update_subtask_status(entry_id, subtask_id, subtask_status)266 bogus_email_id = 1001267 to_list = ['test@test.com']268 global_email_context = {'course_title': 'dummy course'}269 new_subtask_status = SubtaskStatus.create(subtask_id)270 with self.assertRaisesRegex(DuplicateTaskException, 'already completed'):271 send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())272 def test_send_email_running_subtask(self):273 # test at a lower level, to ensure that the course gets checked down below too.274 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)275 entry_id = entry.id276 subtask_id = "subtask-id-value"277 initialize_subtask_info(entry, "emailed", 100, [subtask_id])278 subtask_status = SubtaskStatus.create(subtask_id)279 update_subtask_status(entry_id, subtask_id, subtask_status)280 check_subtask_is_valid(entry_id, subtask_id, subtask_status)281 bogus_email_id = 1001282 to_list = ['test@test.com']283 global_email_context = {'course_title': 'dummy course'}284 with self.assertRaisesRegex(DuplicateTaskException, 'already being executed'):285 send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())286 def test_send_email_retried_subtask(self):287 # test at a lower level, to ensure that the course gets checked down below too.288 entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)289 entry_id = entry.id290 subtask_id = "subtask-id-value"291 initialize_subtask_info(entry, "emailed", 100, [subtask_id])292 subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=2)293 update_subtask_status(entry_id, subtask_id, subtask_status)294 bogus_email_id = 1001295 to_list = ['test@test.com']296 global_email_context = {'course_title': 'dummy course'}297 # try running with a clean subtask:298 new_subtask_status = SubtaskStatus.create(subtask_id)299 with self.assertRaisesRegex(DuplicateTaskException, 'already retried'):300 send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful