Best Python code snippet using locust
subtasks.py
Source:subtasks.py  
...53            metric + "." + memory_type,54            memory_used,55            tags=["course_id:{}".format(course_id)],56        )57def _generate_items_for_subtask(58    item_querysets,  # pylint: disable=bad-continuation59    item_fields,60    total_num_items,61    items_per_task,62    total_num_subtasks,63    course_id,64):65    """66    Generates a chunk of "items" that should be passed into a subtask.67    Arguments:68        `item_querysets` : a list of query sets, each of which defines the "items" that should be passed to subtasks.69        `item_fields` : the fields that should be included in the dict that is returned.70            These are in addition to the 'pk' field.71        `total_num_items` : the result of summing the count of each queryset in `item_querysets`.72        `items_per_query` : size of chunks to break the query operation into.73        `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.74        `course_id` : course_id of the course. Only needed for the track_memory_usage context manager.75    Returns:  yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.76    Warning:  if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed.77    """78    num_items_queued = 079    all_item_fields = list(item_fields)80    all_item_fields.append('pk')81    num_subtasks = 082    items_for_task = []83    with track_memory_usage('course_email.subtask_generation.memory', course_id):84        for queryset in item_querysets:85            for item in queryset.values(*all_item_fields).iterator():86                if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:87                    yield items_for_task88                    num_items_queued += items_per_task89                    items_for_task = []90                    num_subtasks += 191                items_for_task.append(item)92        # yield remainder items for task, if any93        if items_for_task:94            yield items_for_task95            num_items_queued += len(items_for_task)96    # Note, depending on what kind of DB is used, it's possible for the queryset97    # we iterate over to change in the course of the query. Therefore it's98    # possible that there are more (or fewer) items queued than were initially99    # calculated. It also means it's possible that the last task contains100    # more items than items_per_task allows. We expect this to be a small enough101    # number as to be negligible.102    if num_items_queued != total_num_items:103        TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items)104class SubtaskStatus(object):105    """106    Create and return a dict for tracking the status of a subtask.107    SubtaskStatus values are:108      'task_id' : id of subtask.  This is used to pass task information across retries.109      'attempted' : number of attempts -- should equal succeeded plus failed110      'succeeded' : number that succeeded in processing111      'skipped' : number that were not processed.112      'failed' : number that failed during processing113      'retried_nomax' : number of times the subtask has been retried for conditions that114          should not have a maximum count applied115      'retried_withmax' : number of times the subtask has been retried for conditions that116          should have a maximum count applied117      'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)118    Object is not JSON-serializable, so to_dict and from_dict methods are provided so that119    it can be passed as a serializable argument to tasks (and be reconstituted within such tasks).120    In future, we may want to include specific error information121    indicating the reason for failure.122    Also, we should count up "not attempted" separately from attempted/failed.123    """124    def __init__(self, task_id, attempted=None, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):125        """Construct a SubtaskStatus object."""126        self.task_id = task_id127        if attempted is not None:128            self.attempted = attempted129        else:130            self.attempted = succeeded + failed131        self.succeeded = succeeded132        self.failed = failed133        self.skipped = skipped134        self.retried_nomax = retried_nomax135        self.retried_withmax = retried_withmax136        self.state = state if state is not None else QUEUING137    @classmethod138    def from_dict(cls, d):139        """Construct a SubtaskStatus object from a dict representation."""140        options = dict(d)141        task_id = options['task_id']142        del options['task_id']143        return SubtaskStatus.create(task_id, **options)144    @classmethod145    def create(cls, task_id, **options):146        """Construct a SubtaskStatus object."""147        return cls(task_id, **options)148    def to_dict(self):149        """150        Output a dict representation of a SubtaskStatus object.151        Use for creating a JSON-serializable representation for use by tasks.152        """153        return self.__dict__154    def increment(self, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):155        """156        Update the result of a subtask with additional results.157        Kwarg arguments are incremented to the existing values.158        The exception is for `state`, which if specified is used to override the existing value.159        """160        self.attempted += (succeeded + failed)161        self.succeeded += succeeded162        self.failed += failed163        self.skipped += skipped164        self.retried_nomax += retried_nomax165        self.retried_withmax += retried_withmax166        if state is not None:167            self.state = state168    def get_retry_count(self):169        """Returns the number of retries of any kind."""170        return self.retried_nomax + self.retried_withmax171    def __repr__(self):172        """Return print representation of a SubtaskStatus object."""173        return 'SubtaskStatus<%r>' % (self.to_dict(),)174    def __unicode__(self):175        """Return unicode version of a SubtaskStatus object representation."""176        return unicode(repr(self))177def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):178    """179    Store initial subtask information to InstructorTask object.180    The InstructorTask's "task_output" field is initialized.  This is a JSON-serialized dict.181    Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,182    as is the 'duration_ms' value.  A 'start_time' is stored for later duration calculations,183    and the total number of "things to do" is set, so the user can be told how much needs to be184    done overall.  The `action_name` is also stored, to help with constructing more readable185    task_progress messages.186    The InstructorTask's "subtasks" field is also initialized.  This is also a JSON-serialized dict.187    Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of188    subtasks.  'Total' is set here to the total number, while the other three are initialized to zero.189    Once the counters for 'succeeded' and 'failed' match the 'total', the subtasks are done and190    the InstructorTask's "status" will be changed to SUCCESS.191    The "subtasks" field also contains a 'status' key, that contains a dict that stores status192    information for each subtask.  The value for each subtask (keyed by its task_id)193    is its subtask status, as defined by SubtaskStatus.to_dict().194    This information needs to be set up in the InstructorTask before any of the subtasks start195    running.  If not, there is a chance that the subtasks could complete before the parent task196    is done creating subtasks.  Doing so also simplifies the save() here, as it avoids the need197    for locking.198    Monitoring code should assume that if an InstructorTask has subtask information, that it should199    rely on the status stored in the InstructorTask object, rather than status stored in the200    corresponding AsyncResult.201    """202    task_progress = {203        'action_name': action_name,204        'attempted': 0,205        'failed': 0,206        'skipped': 0,207        'succeeded': 0,208        'total': total_num,209        'duration_ms': int(0),210        'start_time': time()211    }212    entry.task_output = InstructorTask.create_output_for_success(task_progress)213    entry.task_state = PROGRESS214    # Write out the subtasks information.215    num_subtasks = len(subtask_id_list)216    # Note that may not be necessary to store initial value with all those zeroes!217    # Write out as a dict, so it will go more smoothly into json.218    subtask_status = {subtask_id: (SubtaskStatus.create(subtask_id)).to_dict() for subtask_id in subtask_id_list}219    subtask_dict = {220        'total': num_subtasks,221        'succeeded': 0,222        'failed': 0,223        'status': subtask_status224    }225    entry.subtasks = json.dumps(subtask_dict)226    # and save the entry immediately, before any subtasks actually start work:227    entry.save_now()228    return task_progress229# pylint: disable=bad-continuation230def queue_subtasks_for_query(231    entry,232    action_name,233    create_subtask_fcn,234    item_querysets,235    item_fields,236    items_per_task,237    total_num_items,238):239    """240    Generates and queues subtasks to each execute a chunk of "items" generated by a queryset.241    Arguments:242        `entry` : the InstructorTask object for which subtasks are being queued.243        `action_name` : a past-tense verb that can be used for constructing readable status messages.244        `create_subtask_fcn` : a function of two arguments that constructs the desired kind of subtask object.245            Arguments are the list of items to be processed by this subtask, and a SubtaskStatus246            object reflecting initial status (and containing the subtask's id).247        `item_querysets` : a list of query sets that define the "items" that should be passed to subtasks.248        `item_fields` : the fields that should be included in the dict that is returned.249            These are in addition to the 'pk' field.250        `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.251        `total_num_items` : total amount of items that will be put into subtasks252    Returns:  the task progress as stored in the InstructorTask object.253    """254    task_id = entry.task_id255    # Calculate the number of tasks that will be created, and create a list of ids for each task.256    total_num_subtasks = _get_number_of_subtasks(total_num_items, items_per_task)257    subtask_id_list = [str(uuid4()) for _ in range(total_num_subtasks)]258    # Update the InstructorTask  with information about the subtasks we've defined.259    TASK_LOG.info(260        "Task %s: updating InstructorTask %s with subtask info for %s subtasks to process %s items.",261        task_id,262        entry.id,263        total_num_subtasks,264        total_num_items,265    )  # pylint: disable=no-member266    progress = initialize_subtask_info(entry, action_name, total_num_items, subtask_id_list)267    # Construct a generator that will return the recipients to use for each subtask.268    # Pass in the desired fields to fetch for each recipient.269    item_list_generator = _generate_items_for_subtask(270        item_querysets,271        item_fields,272        total_num_items,273        items_per_task,274        total_num_subtasks,275        entry.course_id,276    )277    # Now create the subtasks, and start them running.278    TASK_LOG.info(279        "Task %s: creating %s subtasks to process %s items.",280        task_id,281        total_num_subtasks,282        total_num_items,283    )...test_err_handling.py
Source:test_err_handling.py  
...230        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)231        task_input = {"email_id": email.id}232        with self.assertRaisesRegex(ValueError, 'does not match email value'):233            perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name")234    def test_send_email_undefined_subtask(self):235        # test at a lower level, to ensure that the course gets checked down below too.236        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)237        entry_id = entry.id238        to_list = ['test@test.com']239        global_email_context = {'course_title': 'dummy course'}240        subtask_id = "subtask-id-value"241        subtask_status = SubtaskStatus.create(subtask_id)242        email_id = 1001243        with self.assertRaisesRegex(DuplicateTaskException, 'unable to find subtasks of instructor task'):244            send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status.to_dict())245    def test_send_email_missing_subtask(self):246        # test at a lower level, to ensure that the course gets checked down below too.247        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)248        entry_id = entry.id249        to_list = ['test@test.com']250        global_email_context = {'course_title': 'dummy course'}251        subtask_id = "subtask-id-value"252        initialize_subtask_info(entry, "emailed", 100, [subtask_id])253        different_subtask_id = "bogus-subtask-id-value"254        subtask_status = SubtaskStatus.create(different_subtask_id)255        bogus_email_id = 1001256        with self.assertRaisesRegex(DuplicateTaskException, 'unable to find status for subtask of instructor task'):257            send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())258    def test_send_email_completed_subtask(self):259        # test at a lower level, to ensure that the course gets checked down below too.260        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)261        entry_id = entry.id262        subtask_id = "subtask-id-value"263        initialize_subtask_info(entry, "emailed", 100, [subtask_id])264        subtask_status = SubtaskStatus.create(subtask_id, state=SUCCESS)265        update_subtask_status(entry_id, subtask_id, subtask_status)266        bogus_email_id = 1001267        to_list = ['test@test.com']268        global_email_context = {'course_title': 'dummy course'}269        new_subtask_status = SubtaskStatus.create(subtask_id)270        with self.assertRaisesRegex(DuplicateTaskException, 'already completed'):271            send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())272    def test_send_email_running_subtask(self):273        # test at a lower level, to ensure that the course gets checked down below too.274        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)275        entry_id = entry.id276        subtask_id = "subtask-id-value"277        initialize_subtask_info(entry, "emailed", 100, [subtask_id])278        subtask_status = SubtaskStatus.create(subtask_id)279        update_subtask_status(entry_id, subtask_id, subtask_status)280        check_subtask_is_valid(entry_id, subtask_id, subtask_status)281        bogus_email_id = 1001282        to_list = ['test@test.com']283        global_email_context = {'course_title': 'dummy course'}284        with self.assertRaisesRegex(DuplicateTaskException, 'already being executed'):285            send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status.to_dict())286    def test_send_email_retried_subtask(self):287        # test at a lower level, to ensure that the course gets checked down below too.288        entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)289        entry_id = entry.id290        subtask_id = "subtask-id-value"291        initialize_subtask_info(entry, "emailed", 100, [subtask_id])292        subtask_status = SubtaskStatus.create(subtask_id, state=RETRY, retried_nomax=2)293        update_subtask_status(entry_id, subtask_id, subtask_status)294        bogus_email_id = 1001295        to_list = ['test@test.com']296        global_email_context = {'course_title': 'dummy course'}297        # try running with a clean subtask:298        new_subtask_status = SubtaskStatus.create(subtask_id)299        with self.assertRaisesRegex(DuplicateTaskException, 'already retried'):300            send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status.to_dict())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
