Best Python code snippet using hypothesis
shrinker.py
Source:shrinker.py  
...238                return self.cached_calculations[cache_key]239            except KeyError:240                return self.cached_calculations.setdefault(cache_key, f())241        return accept242    def explain_next_call_as(self, explanation):243        self.__pending_shrink_explanation = explanation244    def clear_call_explanation(self):245        self.__pending_shrink_explanation = None246    def add_new_pass(self, run):247        """Creates a shrink pass corresponding to calling ``run(self)``"""248        definition = SHRINK_PASS_DEFINITIONS[run]249        p = ShrinkPass(250            run_with_chooser=definition.run_with_chooser,251            shrinker=self,252            index=len(self.passes),253        )254        self.passes.append(p)255        self.passes_by_name[p.name] = p256        return p257    def shrink_pass(self, name):258        """Return the ShrinkPass object for the pass with the given name."""259        if isinstance(name, ShrinkPass):260            return name261        if name not in self.passes_by_name:262            self.add_new_pass(name)263        return self.passes_by_name[name]264    @property265    def calls(self):266        """Return the number of calls that have been made to the underlying267        test function."""268        return self.engine.call_count269    def consider_new_buffer(self, buffer):270        """Returns True if after running this buffer the result would be271        the current shrink_target."""272        buffer = bytes(buffer)273        return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)274    def incorporate_new_buffer(self, buffer):275        """Either runs the test function on this buffer and returns True if276        that changed the shrink_target, or determines that doing so would277        be useless and returns False without running it."""278        buffer = bytes(buffer[: self.shrink_target.index])279        # Sometimes an attempt at lexicographic minimization will do the wrong280        # thing because the buffer has changed under it (e.g. something has281        # turned into a write, the bit size has changed). The result would be282        # an invalid string, but it's better for us to just ignore it here as283        # it turns out to involve quite a lot of tricky book-keeping to get284        # this right and it's better to just handle it in one place.285        if sort_key(buffer) >= sort_key(self.shrink_target.buffer):286            return False287        if self.shrink_target.buffer.startswith(buffer):288            return False289        previous = self.shrink_target290        self.cached_test_function(buffer)291        return previous is not self.shrink_target292    def incorporate_test_data(self, data):293        """Takes a ConjectureData or Overrun object updates the current294        shrink_target if this data represents an improvement over it,295        returning True if it is."""296        if data is Overrun or data is self.shrink_target:297            return298        if self.__predicate(data) and sort_key(data.buffer) < sort_key(299            self.shrink_target.buffer300        ):301            self.update_shrink_target(data)302            return True303        return False304    def cached_test_function(self, buffer):305        """Returns a cached version of the underlying test function, so306        that the result is either an Overrun object (if the buffer is307        too short to be a valid test case) or a ConjectureData object308        with status >= INVALID that would result from running this buffer."""309        if self.__pending_shrink_explanation is not None:310            self.debug(self.__pending_shrink_explanation)311            self.__pending_shrink_explanation = None312        buffer = bytes(buffer)313        result = self.engine.cached_test_function(buffer)314        self.incorporate_test_data(result)315        return result316    def debug(self, msg):317        self.engine.debug(msg)318    @property319    def random(self):320        return self.engine.random321    def shrink(self):322        """Run the full set of shrinks and update shrink_target.323        This method is "mostly idempotent" - calling it twice is unlikely to324        have any effect, though it has a non-zero probability of doing so.325        """326        # We assume that if an all-zero block of bytes is an interesting327        # example then we're not going to do better than that.328        # This might not technically be true: e.g. for integers() | booleans()329        # the simplest example is actually [1, 0]. Missing this case is fairly330        # harmless and this allows us to make various simplifying assumptions331        # about the structure of the data (principally that we're never332        # operating on a block of all zero bytes so can use non-zeroness as a333        # signpost of complexity).334        if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(335            bytes(len(self.shrink_target.buffer))336        ):337            return338        try:339            self.greedy_shrink()340        finally:341            if self.engine.report_debug_info:342                def s(n):343                    return "s" if n != 1 else ""344                total_deleted = self.initial_size - len(self.shrink_target.buffer)345                self.debug("---------------------")346                self.debug("Shrink pass profiling")347                self.debug("---------------------")348                self.debug("")349                calls = self.engine.call_count - self.initial_calls350                self.debug(351                    (352                        "Shrinking made a total of %d call%s "353                        "of which %d shrank. This deleted %d byte%s out of %d."354                    )355                    % (356                        calls,357                        s(calls),358                        self.shrinks,359                        total_deleted,360                        s(total_deleted),361                        self.initial_size,362                    )363                )364                for useful in [True, False]:365                    self.debug("")366                    if useful:367                        self.debug("Useful passes:")368                    else:369                        self.debug("Useless passes:")370                    self.debug("")371                    for p in sorted(372                        self.passes, key=lambda t: (-t.calls, t.deletions, t.shrinks)373                    ):374                        if p.calls == 0:375                            continue376                        if (p.shrinks != 0) != useful:377                            continue378                        self.debug(379                            (380                                "  * %s made %d call%s of which "381                                "%d shrank, deleting %d byte%s."382                            )383                            % (384                                p.name,385                                p.calls,386                                s(p.calls),387                                p.shrinks,388                                p.deletions,389                                s(p.deletions),390                            )391                        )392                self.debug("")393    def greedy_shrink(self):394        """Run a full set of greedy shrinks (that is, ones that will only ever395        move to a better target) and update shrink_target appropriately.396        This method iterates to a fixed point and so is idempontent - calling397        it twice will have exactly the same effect as calling it once.398        """399        self.fixate_shrink_passes(400            [401                block_program("X" * 5),402                block_program("X" * 4),403                block_program("X" * 3),404                block_program("X" * 2),405                block_program("X" * 1),406                "pass_to_descendant",407                "adaptive_example_deletion",408                "alphabet_minimize",409                "zero_examples",410                "reorder_examples",411                "minimize_floats",412                "minimize_duplicated_blocks",413                block_program("-XX"),414                "minimize_individual_blocks",415                block_program("--X"),416            ]417        )418    @derived_value419    def shrink_pass_choice_trees(self):420        return defaultdict(ChoiceTree)421    def fixate_shrink_passes(self, passes):422        """Run steps from each pass in ``passes`` until the current shrink target423        is a fixed point of all of them."""424        passes = list(map(self.shrink_pass, passes))425        any_ran = True426        while any_ran:427            any_ran = False428            # We run remove_discarded after every step to do cleanup429            # keeping track of whether that actually works. Either there is430            # no discarded data and it is basically free, or it reliably works431            # and deletes data, or it doesn't work. In that latter case we turn432            # it off for the rest of this loop through the passes, but will433            # try again once all of the passes have been run.434            can_discard = self.remove_discarded()435            successful_passes = set()436            for sp in passes:437                # We run each pass until it has failed a certain number438                # of times, where a "failure" is any step where it made439                # at least one call and did not result in a shrink.440                # This gives passes which work reasonably often more of441                # chance to run.442                failures = 0443                successes = 0444                # The choice of 3 is fairly arbitrary and was hand tuned445                # to some particular examples. It is very unlikely that446                # is the best choice in general, but it's not an447                # unreasonable choice: Making it smaller than this would448                # give too high a chance of an otherwise very worthwhile449                # pass getting screened out too early if it got unlucky,450                # and making it much larger than this would result in us451                # spending too much time on bad passes.452                max_failures = 3453                while failures < max_failures:454                    prev_calls = self.calls455                    prev = self.shrink_target456                    if sp.step():457                        any_ran = True458                    else:459                        break460                    if prev_calls != self.calls:461                        if can_discard:462                            can_discard = self.remove_discarded()463                        if prev is self.shrink_target:464                            failures += 1465                        else:466                            successes += 1467                if successes > 0:468                    successful_passes.add(sp)469            # If only some of our shrink passes are doing anything useful470            # then run all of those to a fixed point before running the471            # full set. This is particularly important when an emergency472            # shrink pass unlocks some non-emergency ones and it suddenly473            # becomes very expensive to find a bunch of small changes.474            if 0 < len(successful_passes) < len(passes):475                self.fixate_shrink_passes(successful_passes)476        for sp in passes:477            sp.fixed_point_at = self.shrink_target478    @property479    def buffer(self):480        return self.shrink_target.buffer481    @property482    def blocks(self):483        return self.shrink_target.blocks484    @property485    def examples(self):486        return self.shrink_target.examples487    def all_block_bounds(self):488        return self.shrink_target.blocks.all_bounds()489    @derived_value490    def examples_by_label(self):491        """An index of all examples grouped by their label, with492        the examples stored in their normal index order."""493        examples_by_label = defaultdict(list)494        for ex in self.examples:495            examples_by_label[ex.label].append(ex)496        return dict(examples_by_label)497    @derived_value498    def distinct_labels(self):499        return sorted(self.examples_by_label, key=str)500    @defines_shrink_pass()501    def pass_to_descendant(self, chooser):502        """Attempt to replace each example with a descendant example.503        This is designed to deal with strategies that call themselves504        recursively. For example, suppose we had:505        binary_tree = st.deferred(506            lambda: st.one_of(507                st.integers(), st.tuples(binary_tree, binary_tree)))508        This pass guarantees that we can replace any binary tree with one of509        its subtrees - each of those will create an interval that the parent510        could validly be replaced with, and this pass will try doing that.511        This is pretty expensive - it takes O(len(intervals)^2) - so we run it512        late in the process when we've got the number of intervals as far down513        as possible.514        """515        label = chooser.choose(516            self.distinct_labels, lambda l: len(self.examples_by_label[l]) >= 2517        )518        ls = self.examples_by_label[label]519        i = chooser.choose(range(len(ls) - 1))520        ancestor = ls[i]521        if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end:522            return523        @self.cached(label, i)524        def descendants():525            lo = i + 1526            hi = len(ls)527            while lo + 1 < hi:528                mid = (lo + hi) // 2529                if ls[mid].start >= ancestor.end:530                    hi = mid531                else:532                    lo = mid533            return [t for t in ls[i + 1 : hi] if t.length < ancestor.length]534        descendant = chooser.choose(descendants, lambda ex: ex.length > 0)535        assert ancestor.start <= descendant.start536        assert ancestor.end >= descendant.end537        assert descendant.length < ancestor.length538        self.incorporate_new_buffer(539            self.buffer[: ancestor.start]540            + self.buffer[descendant.start : descendant.end]541            + self.buffer[ancestor.end :]542        )543    def lower_common_block_offset(self):544        """Sometimes we find ourselves in a situation where changes to one part545        of the byte stream unlock changes to other parts. Sometimes this is546        good, but sometimes this can cause us to exhibit exponential slow547        downs!548        e.g. suppose we had the following:549        m = draw(integers(min_value=0))550        n = draw(integers(min_value=0))551        assert abs(m - n) > 1552        If this fails then we'll end up with a loop where on each iteration we553        reduce each of m and n by 2 - m can't go lower because of n, then n554        can't go lower because of m.555        This will take us O(m) iterations to complete, which is exponential in556        the data size, as we gradually zig zag our way towards zero.557        This can only happen if we're failing to reduce the size of the byte558        stream: The number of iterations that reduce the length of the byte559        stream is bounded by that length.560        So what we do is this: We keep track of which blocks are changing, and561        then if there's some non-zero common offset to them we try and minimize562        them all at once by lowering that offset.563        This may not work, and it definitely won't get us out of all possible564        exponential slow downs (an example of where it doesn't is where the565        shape of the blocks changes as a result of this bouncing behaviour),566        but it fails fast when it doesn't work and gets us out of a really567        nastily slow case when it does.568        """569        if len(self.__changed_blocks) <= 1:570            return571        current = self.shrink_target572        blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]573        changed = [574            i575            for i in sorted(self.__changed_blocks)576            if not self.shrink_target.blocks[i].trivial577        ]578        if not changed:579            return580        ints = [int_from_bytes(blocked[i]) for i in changed]581        offset = min(ints)582        assert offset > 0583        for i in range(len(ints)):584            ints[i] -= offset585        def reoffset(o):586            new_blocks = list(blocked)587            for i, v in zip(changed, ints):588                new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))589            return self.incorporate_new_buffer(b"".join(new_blocks))590        Integer.shrink(offset, reoffset, random=self.random)591        self.clear_change_tracking()592    def clear_change_tracking(self):593        self.__last_checked_changed_at = self.shrink_target594        self.__all_changed_blocks = set()595    def mark_changed(self, i):596        self.__changed_blocks.add(i)597    @property598    def __changed_blocks(self):599        if self.__last_checked_changed_at is not self.shrink_target:600            prev_target = self.__last_checked_changed_at601            new_target = self.shrink_target602            assert prev_target is not new_target603            prev = prev_target.buffer604            new = new_target.buffer605            assert sort_key(new) < sort_key(prev)606            if (607                len(new_target.blocks) != len(prev_target.blocks)608                or new_target.blocks.endpoints != prev_target.blocks.endpoints609            ):610                self.__all_changed_blocks = set()611            else:612                blocks = new_target.blocks613                # Index of last block whose contents have been modified, found614                # by checking if the tail past this point has been modified.615                last_changed = binary_search(616                    0,617                    len(blocks),618                    lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],619                )620                # Index of the first block whose contents have been changed,621                # because we know that this predicate is true for zero (because622                # the prefix from the start is empty), so the result must be True623                # for the bytes from the start of this block and False for the624                # bytes from the end, hence the change is in this block.625                first_changed = binary_search(626                    0,627                    len(blocks),628                    lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],629                )630                # Between these two changed regions we now do a linear scan to631                # check if any specific block values have changed.632                for i in range(first_changed, last_changed + 1):633                    u, v = blocks.bounds(i)634                    if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:635                        self.__all_changed_blocks.add(i)636            self.__last_checked_changed_at = new_target637        assert self.__last_checked_changed_at is self.shrink_target638        return self.__all_changed_blocks639    def update_shrink_target(self, new_target):640        assert isinstance(new_target, ConjectureResult)641        if self.shrink_target is not None:642            self.shrinks += 1643        else:644            self.__all_changed_blocks = set()645            self.__last_checked_changed_at = new_target646        self.shrink_target = new_target647        self.__derived_values = {}648    def try_shrinking_blocks(self, blocks, b):649        """Attempts to replace each block in the blocks list with b. Returns650        True if it succeeded (which may include some additional modifications651        to shrink_target).652        In current usage it is expected that each of the blocks currently have653        the same value, although this is not essential. Note that b must be654        < the block at min(blocks) or this is not a valid shrink.655        This method will attempt to do some small amount of work to delete data656        that occurs after the end of the blocks. This is useful for cases where657        there is some size dependency on the value of a block.658        """659        initial_attempt = bytearray(self.shrink_target.buffer)660        for i, block in enumerate(blocks):661            if block >= len(self.blocks):662                blocks = blocks[:i]663                break664            u, v = self.blocks[block].bounds665            n = min(self.blocks[block].length, len(b))666            initial_attempt[v - n : v] = b[-n:]667        if not blocks:668            return False669        start = self.shrink_target.blocks[blocks[0]].start670        end = self.shrink_target.blocks[blocks[-1]].end671        initial_data = self.cached_test_function(initial_attempt)672        if initial_data is self.shrink_target:673            self.lower_common_block_offset()674            return True675        # If this produced something completely invalid we ditch it676        # here rather than trying to persevere.677        if initial_data.status < Status.VALID:678            return False679        # We've shrunk inside our group of blocks, so we have no way to680        # continue. (This only happens when shrinking more than one block at681        # a time).682        if len(initial_data.buffer) < v:683            return False684        lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)685        # If this did not in fact cause the data size to shrink we686        # bail here because it's not worth trying to delete stuff from687        # the remainder.688        if lost_data <= 0:689            return False690        # We now look for contiguous regions to delete that might help fix up691        # this failed shrink. We only look for contiguous regions of the right692        # lengths because doing anything more than that starts to get very693        # expensive. See minimize_individual_blocks for where we694        # try to be more aggressive.695        regions_to_delete = {(end, end + lost_data)}696        for j in (blocks[-1] + 1, blocks[-1] + 2):697            if j >= min(len(initial_data.blocks), len(self.blocks)):698                continue699            # We look for a block very shortly after the last one that has700            # lost some of its size, and try to delete from the beginning so701            # that it retains the same integer value. This is a bit of a hyper702            # specific trick designed to make our integers() strategy shrink703            # well.704            r1, s1 = self.shrink_target.blocks[j].bounds705            r2, s2 = initial_data.blocks[j].bounds706            lost = (s1 - r1) - (s2 - r2)707            # Apparently a coverage bug? An assert False in the body of this708            # will reliably fail, but it shows up as uncovered.709            if lost <= 0 or r1 != r2:  # pragma: no cover710                continue711            regions_to_delete.add((r1, r1 + lost))712        for ex in self.shrink_target.examples:713            if ex.start > start:714                continue715            if ex.end <= end:716                continue717            replacement = initial_data.examples[ex.index]718            in_original = [c for c in ex.children if c.start >= end]719            in_replaced = [c for c in replacement.children if c.start >= end]720            if len(in_replaced) >= len(in_original) or not in_replaced:721                continue722            # We've found an example where some of the children went missing723            # as a result of this change, and just replacing it with the data724            # it would have had and removing the spillover didn't work. This725            # means that some of its children towards the right must be726            # important, so we try to arrange it so that it retains its727            # rightmost children instead of its leftmost.728            regions_to_delete.add(729                (in_original[0].start, in_original[-len(in_replaced)].start)730            )731        for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):732            try_with_deleted = bytearray(initial_attempt)733            del try_with_deleted[u:v]734            if self.incorporate_new_buffer(try_with_deleted):735                return True736        return False737    def remove_discarded(self):738        """Try removing all bytes marked as discarded.739        This is primarily to deal with data that has been ignored while740        doing rejection sampling - e.g. as a result of an integer range, or a741        filtered strategy.742        Such data will also be handled by the adaptive_example_deletion pass,743        but that pass is necessarily more conservative and will try deleting744        each interval individually. The common case is that all data drawn and745        rejected can just be thrown away immediately in one block, so this pass746        will be much faster than trying each one individually when it works.747        returns False if there is discarded data and removing it does not work,748        otherwise returns True.749        """750        while self.shrink_target.has_discards:751            discarded = []752            for ex in self.shrink_target.examples:753                if (754                    ex.length > 0755                    and ex.discarded756                    and (not discarded or ex.start >= discarded[-1][-1])757                ):758                    discarded.append((ex.start, ex.end))759            # This can happen if we have discards but they are all of760            # zero length. This shouldn't happen very often so it's761            # faster to check for it here than at the point of example762            # generation.763            if not discarded:764                break765            attempt = bytearray(self.shrink_target.buffer)766            for u, v in reversed(discarded):767                del attempt[u:v]768            if not self.incorporate_new_buffer(attempt):769                return False770        return True771    @defines_shrink_pass()772    def adaptive_example_deletion(self, chooser):773        """Attempts to delete every example from the test case.774        That is, it is logically equivalent to trying ``self.buffer[:ex.start] +775        self.buffer[ex.end:]`` for every example ``ex``. The order in which776        examples are tried is randomized, and when deletion is successful it777        will attempt to adapt to delete more than one example at a time.778        """779        example = chooser.choose(self.examples)780        if not self.incorporate_new_buffer(781            self.buffer[: example.start] + self.buffer[example.end :]782        ):783            return784        # If we successfully deleted one example there may be a useful785        # deletable region around here.786        original = self.shrink_target787        endpoints = set()788        for ex in original.examples:789            if ex.depth <= example.depth:790                endpoints.add(ex.start)791                endpoints.add(ex.end)792        partition = sorted(endpoints)793        j = partition.index(example.start)794        def delete_region(a, b):795            assert a <= j <= b796            if a < 0 or b >= len(partition) - 1:797                return False798            return self.consider_new_buffer(799                original.buffer[: partition[a]] + original.buffer[partition[b] :]800            )801        to_right = find_integer(lambda n: delete_region(j, j + n))802        find_integer(lambda n: delete_region(j - n, j + to_right))803    def try_zero_example(self, ex):804        u = ex.start805        v = ex.end806        attempt = self.cached_test_function(807            self.buffer[:u] + bytes(v - u) + self.buffer[v:]808        )809        if attempt is Overrun:810            return False811        in_replacement = attempt.examples[ex.index]812        used = in_replacement.length813        if attempt is not self.shrink_target:814            if in_replacement.end < len(attempt.buffer) and used < ex.length:815                self.incorporate_new_buffer(816                    self.buffer[:u] + bytes(used) + self.buffer[v:]817                )818        return self.examples[ex.index].trivial819    @defines_shrink_pass()820    def zero_examples(self, chooser):821        """Attempt to replace each example with a minimal version of itself."""822        ex = chooser.choose(self.examples, lambda ex: not ex.trivial)823        # If the example is already trivial, assume there's nothing to do here.824        # We could attempt to use it as an adaptive replacement for other825        # similar examples, but that seems to be ineffective, resulting mostly826        # in redundant work rather than helping.827        if not self.try_zero_example(ex):828            return829        # If we zeroed the example we need to get the new one that replaced it.830        ex = self.examples[ex.index]831        original = self.shrink_target832        group = self.examples_by_label[ex.label]833        i = group.index(ex)834        replacement = self.buffer[ex.start : ex.end]835        # We first expand to cover the trivial region surrounding this group.836        # This avoids a situation where the adaptive phase "succeeds" a lot by837        # virtue of not doing anything and then goes into a galloping phase838        # where it does a bunch of useless work.839        def all_trivial(a, b):840            if a < 0 or b > len(group):841                return False842            return all(e.trivial for e in group[a:b])843        start, end = expand_region(all_trivial, i, i + 1)844        # If we've got multiple trivial examples of different lengths then845        # this isn't going to work as a replacement for all of them and so we846        # skip out early.847        if any(e.length != len(replacement) for e in group[start:end]):848            return849        def can_zero(a, b):850            if a < 0 or b > len(group):851                return False852            regions = []853            for e in group[a:b]:854                t = (e.start, e.end, replacement)855                if not regions or t[0] >= regions[-1][1]:856                    regions.append(t)857            return self.consider_new_buffer(replace_all(original.buffer, regions))858        expand_region(can_zero, start, end)859    @derived_value860    def blocks_by_non_zero_suffix(self):861        """Returns a list of blocks grouped by their non-zero suffix,862        as a list of (suffix, indices) pairs, skipping all groupings863        where there is only one index.864        This is only used for the arguments of minimize_duplicated_blocks.865        """866        duplicates = defaultdict(list)867        for block in self.blocks:868            duplicates[non_zero_suffix(self.buffer[block.start : block.end])].append(869                block.index870            )871        return duplicates872    @derived_value873    def duplicated_block_suffixes(self):874        return sorted(self.blocks_by_non_zero_suffix)875    @defines_shrink_pass()876    def minimize_duplicated_blocks(self, chooser):877        """Find blocks that have been duplicated in multiple places and attempt878        to minimize all of the duplicates simultaneously.879        This lets us handle cases where two values can't be shrunk880        independently of each other but can easily be shrunk together.881        For example if we had something like:882        ls = data.draw(lists(integers()))883        y = data.draw(integers())884        assert y not in ls885        Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were886        to replace both 3s with 0, this would be a valid shrink, but if we were887        to replace either 3 with 0 on its own the test would start passing.888        It is also useful for when that duplication is accidental and the value889        of the blocks doesn't matter very much because it allows us to replace890        more values at once.891        """892        block = chooser.choose(self.duplicated_block_suffixes)893        targets = self.blocks_by_non_zero_suffix[block]894        if len(targets) <= 1:895            return896        Lexical.shrink(897            block,898            lambda b: self.try_shrinking_blocks(targets, b),899            random=self.random,900            full=False,901        )902    @defines_shrink_pass()903    def minimize_floats(self, chooser):904        """Some shrinks that we employ that only really make sense for our905        specific floating point encoding that are hard to discover from any906        sort of reasonable general principle. This allows us to make907        transformations like replacing a NaN with an Infinity or replacing908        a float with its nearest integers that we would otherwise not be909        able to due to them requiring very specific transformations of910        the bit sequence.911        We only apply these transformations to blocks that "look like" our912        standard float encodings because they are only really meaningful913        there. The logic for detecting this is reasonably precise, but914        it doesn't matter if it's wrong. These are always valid915        transformations to make, they just don't necessarily correspond to916        anything particularly meaningful for non-float values.917        """918        ex = chooser.choose(919            self.examples,920            lambda ex: (921                ex.label == DRAW_FLOAT_LABEL922                and len(ex.children) == 2923                and ex.children[0].length == 8924            ),925        )926        u = ex.children[0].start927        v = ex.children[0].end928        buf = self.shrink_target.buffer929        b = buf[u:v]930        f = lex_to_float(int_from_bytes(b))931        b2 = int_to_bytes(float_to_lex(f), 8)932        if b == b2 or self.consider_new_buffer(buf[:u] + b2 + buf[v:]):933            Float.shrink(934                f,935                lambda x: self.consider_new_buffer(936                    self.shrink_target.buffer[:u]937                    + int_to_bytes(float_to_lex(x), 8)938                    + self.shrink_target.buffer[v:]939                ),940                random=self.random,941            )942    @defines_shrink_pass()943    def minimize_individual_blocks(self, chooser):944        """Attempt to minimize each block in sequence.945        This is the pass that ensures that e.g. each integer we draw is a946        minimum value. So it's the part that guarantees that if we e.g. do947        x = data.draw(integers())948        assert x < 10949        then in our shrunk example, x = 10 rather than say 97.950        If we are unsuccessful at minimizing a block of interest we then951        check if that's because it's changing the size of the test case and,952        if so, we also make an attempt to delete parts of the test case to953        see if that fixes it.954        We handle most of the common cases in try_shrinking_blocks which is955        pretty good at clearing out large contiguous blocks of dead space,956        but it fails when there is data that has to stay in particular places957        in the list.958        """959        block = chooser.choose(self.blocks, lambda b: not b.trivial)960        initial = self.shrink_target961        u, v = block.bounds962        i = block.index963        Lexical.shrink(964            self.shrink_target.buffer[u:v],965            lambda b: self.try_shrinking_blocks((i,), b),966            random=self.random,967            full=False,968        )969        if self.shrink_target is not initial:970            return971        lowered = (972            self.buffer[: block.start]973            + int_to_bytes(974                int_from_bytes(self.buffer[block.start : block.end]) - 1, block.length975            )976            + self.buffer[block.end :]977        )978        attempt = self.cached_test_function(lowered)979        if (980            attempt.status < Status.VALID981            or len(attempt.buffer) == len(self.buffer)982            or len(attempt.buffer) == block.end983        ):984            return985        # If it were then the lexical shrink should have worked and we could986        # never have got here.987        assert attempt is not self.shrink_target988        @self.cached(block.index)989        def first_example_after_block():990            lo = 0991            hi = len(self.examples)992            while lo + 1 < hi:993                mid = (lo + hi) // 2994                ex = self.examples[mid]995                if ex.start >= block.end:996                    hi = mid997                else:998                    lo = mid999            return hi1000        ex = self.examples[1001            chooser.choose(1002                range(first_example_after_block, len(self.examples)),1003                lambda i: self.examples[i].length > 0,1004            )1005        ]1006        u, v = block.bounds1007        buf = bytearray(lowered)1008        del buf[ex.start : ex.end]1009        self.incorporate_new_buffer(buf)1010    @defines_shrink_pass()1011    def reorder_examples(self, chooser):1012        """This pass allows us to reorder the children of each example.1013        For example, consider the following:1014        .. code-block:: python1015            import hypothesis.strategies as st1016            from hypothesis import given1017            @given(st.text(), st.text())1018            def test_not_equal(x, y):1019                assert x != y1020        Without the ability to reorder x and y this could fail either with1021        ``x=""``, ``y="0"``, or the other way around. With reordering it will1022        reliably fail with ``x=""``, ``y="0"``.1023        """1024        ex = chooser.choose(self.examples)1025        label = chooser.choose(ex.children).label1026        group = [c for c in ex.children if c.label == label]1027        if len(group) <= 1:1028            return1029        st = self.shrink_target1030        pieces = [st.buffer[ex.start : ex.end] for ex in group]1031        endpoints = [(ex.start, ex.end) for ex in group]1032        Ordering.shrink(1033            pieces,1034            lambda ls: self.consider_new_buffer(1035                replace_all(st.buffer, [(u, v, r) for (u, v), r in zip(endpoints, ls)])1036            ),1037            random=self.random,1038        )1039    @derived_value1040    def alphabet(self):1041        return sorted(set(self.buffer))1042    @defines_shrink_pass()1043    def alphabet_minimize(self, chooser):1044        """Attempts to minimize the "alphabet" - the set of bytes that1045        are used in the representation of the current buffer. The main1046        benefit of this is that it significantly increases our cache hit rate1047        by making things that are equivalent more likely to have the same1048        representation, but it's also generally a rather effective "fuzzing"1049        step that gives us a lot of good opportunities to slip to a smaller1050        representation of the same bug.1051        """1052        c = chooser.choose(self.alphabet)1053        buf = self.buffer1054        def can_replace_with(d):1055            if d < 0:1056                return False1057            if self.consider_new_buffer(bytes([d if b == c else b for b in buf])):1058                if d <= 1:1059                    # For small values of d if this succeeds we take this1060                    # as evidence that it is worth doing a a bulk replacement1061                    # where we replace all values which are close1062                    # to c but smaller with d as well. This helps us substantially1063                    # in cases where we have a lot of "dead" bytes that don't really do1064                    # much, as it allows us to replace many of them in one go rather1065                    # than one at a time. An example of where this matters is1066                    # test_minimize_multiple_elements_in_silly_large_int_range_min_is_not_dupe1067                    # in test_shrink_quality.py1068                    def replace_range(k):1069                        if k > c:1070                            return False1071                        def should_replace_byte(b):1072                            return c - k <= b <= c and d < b1073                        return self.consider_new_buffer(1074                            bytes([d if should_replace_byte(b) else b for b in buf])1075                        )1076                    find_integer(replace_range)1077                return True1078        if (1079            # If we cannot replace the current byte with its predecessor,1080            # assume it is already minimal and continue on. This ensures1081            # we make no more than one call per distinct byte value in the1082            # event that no shrinks are possible here.1083            not can_replace_with(c - 1)1084            # We next try replacing with 0 or 1. If this works then1085            # there is nothing else to do here.1086            or can_replace_with(0)1087            or can_replace_with(1)1088            # Finally we try to replace with c - 2 before going on to the1089            # binary search so that in cases which were already nearly1090            # minimal we don't do log(n) extra work.1091            or not can_replace_with(c - 2)1092        ):1093            return1094        # Now binary search to find a small replacement.1095        # Invariant: We cannot replace with lo, we can replace with hi.1096        lo = 11097        hi = c - 21098        while lo + 1 < hi:1099            mid = (lo + hi) // 21100            if can_replace_with(mid):1101                hi = mid1102            else:1103                lo = mid1104    def run_block_program(self, i, description, original, repeats=1):1105        """Block programs are a mini-DSL for block rewriting, defined as a sequence1106        of commands that can be run at some index into the blocks1107        Commands are:1108            * "-", subtract one from this block.1109            * "X", delete this block1110        If a command does not apply (currently only because it's - on a zero1111        block) the block will be silently skipped over.1112        This method runs the block program in ``description`` at block index1113        ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it1114        will attempt to approximate the results of running it that many times.1115        Returns True if this successfully changes the underlying shrink target,1116        else False.1117        """1118        if i + len(description) > len(original.blocks) or i < 0:1119            return False1120        attempt = bytearray(original.buffer)1121        for _ in range(repeats):1122            for k, d in reversed(list(enumerate(description))):1123                j = i + k1124                u, v = original.blocks[j].bounds1125                if v > len(attempt):1126                    return False1127                if d == "-":1128                    value = int_from_bytes(attempt[u:v])1129                    if value == 0:1130                        return False1131                    else:1132                        attempt[u:v] = int_to_bytes(value - 1, v - u)1133                elif d == "X":1134                    del attempt[u:v]1135                else:  # pragma: no cover1136                    raise AssertionError("Unrecognised command %r" % (d,))1137        return self.incorporate_new_buffer(attempt)1138def block_program(description):1139    """Mini-DSL for block rewriting. A sequence of commands that will be run1140    over all contiguous sequences of blocks of the description length in order.1141    Commands are:1142        * ".", keep this block unchanged1143        * "-", subtract one from this block.1144        * "0", replace this block with zero1145        * "X", delete this block1146    If a command does not apply (currently only because it's - on a zero1147    block) the block will be silently skipped over. As a side effect of1148    running a block program its score will be updated.1149    """1150    name = "block_program(%r)" % (description,)1151    if name not in SHRINK_PASS_DEFINITIONS:1152        """Defines a shrink pass that runs the block program ``description``1153        at every block index."""1154        n = len(description)1155        def run(self, chooser):1156            """Adaptively attempt to run the block program at the current1157            index. If this successfully applies the block program ``k`` times1158            then this runs in ``O(log(k))`` test function calls."""1159            i = chooser.choose(range(len(self.shrink_target.blocks) - n))1160            # First, run the block program at the chosen index. If this fails,1161            # don't do any extra work, so that failure is as cheap as possible.1162            if not self.run_block_program(i, description, original=self.shrink_target):1163                return1164            # Because we run in a random order we will often find ourselves in the middle1165            # of a region where we could run the block program. We thus start by moving1166            # left to the beginning of that region if possible in order to to start from1167            # the beginning of that region.1168            def offset_left(k):1169                return i - k * n1170            i = offset_left(1171                find_integer(1172                    lambda k: self.run_block_program(1173                        offset_left(k), description, original=self.shrink_target1174                    )1175                )1176            )1177            original = self.shrink_target1178            # Now try to run the block program multiple times here.1179            find_integer(1180                lambda k: self.run_block_program(1181                    i, description, original=original, repeats=k1182                )1183            )1184        run.__name__ = name1185        defines_shrink_pass()(run)1186        assert name in SHRINK_PASS_DEFINITIONS1187    return name1188@attr.s(slots=True, eq=False)1189class ShrinkPass:1190    run_with_chooser = attr.ib()1191    index = attr.ib()1192    shrinker = attr.ib()1193    next_prefix = attr.ib(default=())1194    fixed_point_at = attr.ib(default=None)1195    successes = attr.ib(default=0)1196    calls = attr.ib(default=0)1197    shrinks = attr.ib(default=0)1198    deletions = attr.ib(default=0)1199    def step(self):1200        if self.fixed_point_at is self.shrinker.shrink_target:1201            return False1202        tree = self.shrinker.shrink_pass_choice_trees[self]1203        if tree.exhausted:1204            return False1205        initial_shrinks = self.shrinker.shrinks1206        initial_calls = self.shrinker.calls1207        size = len(self.shrinker.shrink_target.buffer)1208        self.shrinker.explain_next_call_as(self.name)1209        try:1210            self.next_prefix = tree.step(1211                self.next_prefix,1212                lambda chooser: self.run_with_chooser(self.shrinker, chooser),1213            )1214        finally:1215            self.calls += self.shrinker.calls - initial_calls1216            self.shrinks += self.shrinker.shrinks - initial_shrinks1217            self.deletions += size - len(self.shrinker.shrink_target.buffer)1218            self.shrinker.clear_call_explanation()1219        return True1220    @property1221    def name(self):1222        return self.run_with_chooser.__name__...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
