Best Python code snippet using hypothesis
shrinker.py
Source:shrinker.py  
...75        return self.run_step.__name__76    def __attrs_post_init__(self):77        assert self.name not in SHRINK_PASS_DEFINITIONS, self.name78        SHRINK_PASS_DEFINITIONS[self.name] = self79def defines_shrink_pass(generate_arguments):80    """A convenient decorator for defining shrink passes."""81    def accept(run_step):82        definition = ShrinkPassDefinition(83            generate_arguments=generate_arguments, run_step=run_step84        )85        def run(self):86            return self.run_shrink_pass(definition.name)87        run.__name__ = run_step.__name__88        run.is_shrink_pass = True89        return run90    return accept91class Shrinker(object):92    """A shrinker is a child object of a ConjectureRunner which is designed to93    manage the associated state of a particular shrink problem. That is, we94    have some initial ConjectureData object and some property of interest95    that it satisfies, and we want to find a ConjectureData object with a96    shortlex (see sort_key above) smaller buffer that exhibits the same97    property.98    Currently the only property of interest we use is that the status is99    INTERESTING and the interesting_origin takes on some fixed value, but we100    may potentially be interested in other use cases later.101    However we assume that data with a status < VALID never satisfies the predicate.102    The shrinker keeps track of a value shrink_target which represents the103    current best known ConjectureData object satisfying the predicate.104    It refines this value by repeatedly running *shrink passes*, which are105    methods that perform a series of transformations to the current shrink_target106    and evaluate the underlying test function to find new ConjectureData107    objects. If any of these satisfy the predicate, the shrink_target108    is updated automatically. Shrinking runs until no shrink pass can109    improve the shrink_target, at which point it stops. It may also be110    terminated if the underlying engine throws RunIsComplete, but that111    is handled by the calling code rather than the Shrinker.112    =======================113    Designing Shrink Passes114    =======================115    Generally a shrink pass is just any function that calls116    cached_test_function and/or incorporate_new_buffer a number of times,117    but there are a couple of useful things to bear in mind.118    A shrink pass *makes progress* if running it changes self.shrink_target119    (i.e. it tries a shortlex smaller ConjectureData object satisfying120    the predicate). The desired end state of shrinking is to find a121    value such that no shrink pass can make progress, i.e. that we122    are at a local minimum for each shrink pass.123    In aid of this goal, the main invariant that a shrink pass much124    satisfy is that whether it makes progress must be deterministic.125    It is fine (encouraged even) for the specific progress it makes126    to be non-deterministic, but if you run a shrink pass, it makes127    no progress, and then you immediately run it again, it should128    never succeed on the second time. This allows us to stop as soon129    as we have run each shrink pass and seen no progress on any of130    them.131    This means that e.g. it's fine to try each of N deletions132    or replacements in a random order, but it's not OK to try N random133    deletions (unless you have already shrunk at least once, though we134    don't currently take advantage of this loophole).135    Shrink passes need to be written so as to be robust against136    change in the underlying shrink target. It is generally safe137    to assume that the shrink target does not change prior to the138    point of first modification - e.g. if you change no bytes at139    index ``i``, all examples whose start is ``<= i`` still exist,140    as do all blocks, and the data object is still of length141    ``>= i + 1``. This can only be violated by bad user code which142    relies on an external source of non-determinism.143    When the underlying shrink_target changes, shrink144    passes should not run substantially more test_function calls145    on success than they do on failure. Say, no more than a constant146    factor more. In particular shrink passes should not iterate to a147    fixed point.148    This means that shrink passes are often written with loops that149    are carefully designed to do the right thing in the case that no150    shrinks occurred and try to adapt to any changes to do a reasonable151    job. e.g. say we wanted to write a shrink pass that tried deleting152    each individual byte (this isn't an especially good choice,153    but it leads to a simple illustrative example), we might do it154    by iterating over the buffer like so:155    .. code-block:: python156        i = 0157        while i < len(self.shrink_target.buffer):158            if not self.incorporate_new_buffer(159                self.shrink_target.buffer[: i] +160                self.shrink_target.buffer[i + 1 :]161            ):162                i += 1163    The reason for writing the loop this way is that i is always a164    valid index into the current buffer, even if the current buffer165    changes as a result of our actions. When the buffer changes,166    we leave the index where it is rather than restarting from the167    beginning, and carry on. This means that the number of steps we168    run in this case is always bounded above by the number of steps169    we would run if nothing works.170    Another thing to bear in mind about shrink pass design is that171    they should prioritise *progress*. If you have N operations that172    you need to run, you should try to order them in such a way as173    to avoid stalling, where you have long periods of test function174    invocations where no shrinks happen. This is bad because whenever175    we shrink we reduce the amount of work the shrinker has to do176    in future, and often speed up the test function, so we ideally177    wanted those shrinks to happen much earlier in the process.178    Sometimes stalls are inevitable of course - e.g. if the pass179    makes no progress, then the entire thing is just one long stall,180    but it's helpful to design it so that stalls are less likely181    in typical behaviour.182    The two easiest ways to do this are:183    * Just run the N steps in random order. As long as a184      reasonably large proportion of the operations suceed, this185      guarantees the expected stall length is quite short. The186      book keeping for making sure this does the right thing when187      it succeeds can be quite annoying.188    * When you have any sort of nested loop, loop in such a way189      that both loop variables change each time. This prevents190      stalls which occur when one particular value for the outer191      loop is impossible to make progress on, rendering the entire192      inner loop into a stall.193    However, although progress is good, too much progress can be194    a bad sign! If you're *only* seeing successful reductions,195    that's probably a sign that you are making changes that are196    too timid. Two useful things to offset this:197    * It's worth writing shrink passes which are *adaptive*, in198      the sense that when operations seem to be working really199      well we try to bundle multiple of them together. This can200      often be used to turn what would be O(m) successful calls201      into O(log(m)).202    * It's often worth trying one or two special minimal values203      before trying anything more fine grained (e.g. replacing204      the whole thing with zero).205    """206    def derived_value(fn):207        """It's useful during shrinking to have access to derived values of208        the current shrink target.209        This decorator allows you to define these as cached properties. They210        are calculated once, then cached until the shrink target changes, then211        recalculated the next time they are used."""212        def accept(self):213            try:214                return self.__derived_values[fn.__name__]215            except KeyError:216                return self.__derived_values.setdefault(fn.__name__, fn(self))217        accept.__name__ = fn.__name__218        return property(accept)219    def __init__(self, engine, initial, predicate):220        """Create a shrinker for a particular engine, with a given starting221        point and predicate. When shrink() is called it will attempt to find an222        example for which predicate is True and which is strictly smaller than223        initial.224        Note that initial is a ConjectureData object, and predicate225        takes ConjectureData objects.226        """227        self.__engine = engine228        self.__predicate = predicate229        self.__shrinking_prefixes = set()230        self.__derived_values = {}231        self.__pending_shrink_explanation = None232        self.initial_size = len(initial.buffer)233        # We keep track of the current best example on the shrink_target234        # attribute.235        self.shrink_target = None236        self.update_shrink_target(initial)237        self.shrinks = 0238        self.initial_calls = self.__engine.call_count239        self.passes_by_name = {}240        self.passes = []241    def explain_next_call_as(self, explanation):242        self.__pending_shrink_explanation = explanation243    def clear_call_explanation(self):244        self.__pending_shrink_explanation = None245    def add_new_pass(self, run):246        """Creates a shrink pass corresponding to calling ``run(self)``"""247        definition = SHRINK_PASS_DEFINITIONS[run]248        p = ShrinkPass(249            run_with_arguments=definition.run_step,250            generate_arguments=definition.generate_arguments,251            shrinker=self,252            index=len(self.passes),253        )254        self.passes.append(p)255        self.passes_by_name[p.name] = p256        return p257    def shrink_pass(self, name):258        """Return the ShrinkPass object for the pass with the given name."""259        if name not in self.passes_by_name:260            self.add_new_pass(name)261        return self.passes_by_name[name]262    @property263    def calls(self):264        """Return the number of calls that have been made to the underlying265        test function."""266        return self.__engine.call_count267    def consider_new_buffer(self, buffer):268        """Returns True if after running this buffer the result would be269        the current shrink_target."""270        buffer = hbytes(buffer)271        return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)272    def incorporate_new_buffer(self, buffer):273        """Either runs the test function on this buffer and returns True if274        that changed the shrink_target, or determines that doing so would275        be useless and returns False without running it."""276        buffer = hbytes(buffer[: self.shrink_target.index])277        # Sometimes an attempt at lexicographic minimization will do the wrong278        # thing because the buffer has changed under it (e.g. something has279        # turned into a write, the bit size has changed). The result would be280        # an invalid string, but it's better for us to just ignore it here as281        # it turns out to involve quite a lot of tricky book-keeping to get282        # this right and it's better to just handle it in one place.283        if sort_key(buffer) >= sort_key(self.shrink_target.buffer):284            return False285        if self.shrink_target.buffer.startswith(buffer):286            return False287        previous = self.shrink_target288        self.cached_test_function(buffer)289        return previous is not self.shrink_target290    def incorporate_test_data(self, data):291        """Takes a ConjectureData or Overrun object updates the current292        shrink_target if this data represents an improvement over it,293        returning True if it is."""294        if data is Overrun or data is self.shrink_target:295            return296        if self.__predicate(data) and sort_key(data.buffer) < sort_key(297            self.shrink_target.buffer298        ):299            self.update_shrink_target(data)300            self.__shrinking_block_cache = {}301            return True302        return False303    def cached_test_function(self, buffer):304        """Returns a cached version of the underlying test function, so305        that the result is either an Overrun object (if the buffer is306        too short to be a valid test case) or a ConjectureData object307        with status >= INVALID that would result from running this buffer."""308        if self.__pending_shrink_explanation is not None:309            self.debug(self.__pending_shrink_explanation)310            self.__pending_shrink_explanation = None311        buffer = hbytes(buffer)312        result = self.__engine.cached_test_function(buffer)313        self.incorporate_test_data(result)314        return result315    def debug(self, msg):316        self.__engine.debug(msg)317    @property318    def random(self):319        return self.__engine.random320    def run_shrink_pass(self, sp):321        """Runs the function associated with ShrinkPass sp and updates the322        relevant metadata.323        Note that sp may or may not be a pass currently associated with324        this shrinker. This does not handle any requeing that is325        required.326        """327        sp = self.shrink_pass(sp)328        self.debug("Shrink Pass %s" % (sp.name,))329        try:330            sp.runs += 1331            steps = sp.generate_steps()332            self.random.shuffle(steps)333            for s in steps:334                sp.run_step(s)335        finally:336            self.debug("Shrink Pass %s completed." % (sp.name,))337    def shrink(self):338        """Run the full set of shrinks and update shrink_target.339        This method is "mostly idempotent" - calling it twice is unlikely to340        have any effect, though it has a non-zero probability of doing so.341        """342        # We assume that if an all-zero block of bytes is an interesting343        # example then we're not going to do better than that.344        # This might not technically be true: e.g. for integers() | booleans()345        # the simplest example is actually [1, 0]. Missing this case is fairly346        # harmless and this allows us to make various simplifying assumptions347        # about the structure of the data (principally that we're never348        # operating on a block of all zero bytes so can use non-zeroness as a349        # signpost of complexity).350        if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(351            hbytes(len(self.shrink_target.buffer))352        ):353            return354        try:355            self.greedy_shrink()356        finally:357            if self.__engine.report_debug_info:358                def s(n):359                    return "s" if n != 1 else ""360                total_deleted = self.initial_size - len(self.shrink_target.buffer)361                self.debug("---------------------")362                self.debug("Shrink pass profiling")363                self.debug("---------------------")364                self.debug("")365                calls = self.__engine.call_count - self.initial_calls366                self.debug(367                    (368                        "Shrinking made a total of %d call%s "369                        "of which %d shrank. This deleted %d byte%s out of %d."370                    )371                    % (372                        calls,373                        s(calls),374                        self.shrinks,375                        total_deleted,376                        s(total_deleted),377                        self.initial_size,378                    )379                )380                for useful in [True, False]:381                    self.debug("")382                    if useful:383                        self.debug("Useful passes:")384                    else:385                        self.debug("Useless passes:")386                    self.debug("")387                    for p in sorted(388                        self.passes,389                        key=lambda t: (-t.calls, -t.runs, t.deletions, t.shrinks),390                    ):391                        if p.calls == 0:392                            continue393                        if (p.shrinks != 0) != useful:394                            continue395                        self.debug(396                            (397                                "  * %s ran %d time%s, making %d call%s of which "398                                "%d shrank, deleting %d byte%s."399                            )400                            % (401                                p.name,402                                p.runs,403                                s(p.runs),404                                p.calls,405                                s(p.calls),406                                p.shrinks,407                                p.deletions,408                                s(p.deletions),409                            )410                        )411                self.debug("")412    def greedy_shrink(self):413        """Run a full set of greedy shrinks (that is, ones that will only ever414        move to a better target) and update shrink_target appropriately.415        This method iterates to a fixed point and so is idempontent - calling416        it twice will have exactly the same effect as calling it once.417        """418        # The two main parts of shortlex optimisation are of course making419        # the test case "short" and "lex" (that is to say, lexicographically.420        # smaller). Sometimes these are intertwined and it's hard to do one421        # without the other, but as far as we can we *vastly* prefer to make422        # the test case shorter over making it lexicographically smaller.423        # The reason for this is that we can only make O(n) progress on424        # reducing the size, but at a fixed size there are O(256^n)425        # lexicographically smaller values. As a result it's easier to426        # reach a fixed point on size and also every reduction we make427        # in size reduces the amount of work we have to do lexicographically.428        # On top of that, reducing the size makes generation much faster,429        # so even if reducing the size earlier doesn't reduce the number430        # of test cases we run it may still result in significantly faster431        # tests.432        #433        # As a result of this we start by running a bunch of operations434        # that are primarily designed to reduce the size of the test.435        #436        # These fall into two categories:437        #438        # * "structured" ones respect the boundaries of draw calls and are439        #   likely to correspond to semantically meaningful transformations440        #   of the generated test case (e.g. deleting an element of a list,441        #   replacing a tree node with one of its children).442        # * "unstructured" ones will often not correspond to any meaningful443        #   transformation but may succeed by accident or because they happen444        #   to correspond to an unexpectedly meaningful operation (e.g.445        #   merging two adjacent lists).446        #447        # Generally we expect unstructured ones to succeed early on when448        # the test case has a lot of redundancy because they have plenty449        # of opportunity to work by accident, and after that they're mostly450        # unlikely work. As a result we do a slightly odd thing where we451        # run unstructured deletions as part of the initial pass, but then452        # we hold off on running them to a fixed point until we've turned453        # the emergency passes on later.454        unstructured_deletions = [block_program("X" * i) for i in hrange(5, 0, -1)]455        structured_deletions = ["pass_to_descendant", "adaptive_example_deletion"]456        self.fixate_shrink_passes(unstructured_deletions + structured_deletions)457        # "fine" passes are ones that are primarily concerned with lexicographic458        # reduction. They may still delete data (either deliberately or by accident)459        # but the distinguishing feature is that it is possible for them to460        # succeed without the length of the test case changing.461        # As a result we only start running them after we've hit a fixed point462        # for the deletion passes at least once.463        fine = [464            "alphabet_minimize",465            "zero_examples",466            "reorder_examples",467            "minimize_floats",468            "minimize_duplicated_blocks",469            "minimize_individual_blocks",470        ]471        self.fixate_shrink_passes(structured_deletions + fine)472        # "emergency" shrink passes are ones that handle cases that we473        # can't currently handle more elegantly - either they're slightly474        # weird hacks that happen to work or they're expensive passes to475        # run. Generally we hope that the emergency passes don't do anything476        # at all. We also re-enable the unstructured deletions at this point477        # in case new ones have been unlocked since we last ran them, but478        # don't expect that to be a common occurrence..479        emergency = [block_program("-XX"), "example_deletion_with_block_lowering"]480        self.fixate_shrink_passes(481            unstructured_deletions + structured_deletions + fine + emergency482        )483    def fixate_shrink_passes(self, passes):484        """Run steps from each pass in ``passes`` until the current shrink target485        is a fixed point of all of them."""486        passes = list(map(self.shrink_pass, passes))487        initial = None488        while initial is not self.shrink_target:489            initial = self.shrink_target490            for sp in passes:491                sp.runs += 1492            # We run remove_discarded after every step to do cleanup493            # keeping track of whether that actually works. Either there is494            # no discarded data and it is basically free, or it reliably works495            # and deletes data, or it doesn't work. In that latter case we turn496            # it off for the rest of this loop through the passes, but will497            # try again once all of the passes have been run.498            can_discard = self.remove_discarded()499            passes_with_steps = [(sp, None) for sp in passes]500            while passes_with_steps:501                to_run_next = []502                for sp, steps in passes_with_steps:503                    if steps is None:504                        steps = sp.generate_steps()505                    failures = 0506                    max_failures = 3507                    while steps and failures < max_failures:508                        prev_calls = self.calls509                        prev = self.shrink_target510                        sp.run_step(pop_random(self.random, steps))511                        if prev_calls != self.calls:512                            if can_discard:513                                can_discard = self.remove_discarded()514                            if prev is self.shrink_target:515                                failures += 1516                    if steps:517                        to_run_next.append((sp, steps))518                passes_with_steps = to_run_next519        for sp in passes:520            sp.fixed_point_at = self.shrink_target521    @property522    def buffer(self):523        return self.shrink_target.buffer524    @property525    def blocks(self):526        return self.shrink_target.blocks527    @property528    def examples(self):529        return self.shrink_target.examples530    def all_block_bounds(self):531        return self.shrink_target.blocks.all_bounds()532    @derived_value533    def examples_by_label(self):534        """An index of all examples grouped by their label, with535        the examples stored in their normal index order."""536        examples_by_label = defaultdict(list)537        for ex in self.examples:538            examples_by_label[ex.label].append(ex)539        return dict(examples_by_label)540    def calculate_descents(self):541        """Returns a list of all pairs (i, j) such that542        self.examples[i] is an ancestor of self.examples[j] and543        they have the same label.544        """545        result = []546        for ls in self.examples_by_label.values():547            if len(ls) <= 1:548                continue549            for i, ex in enumerate(ls[:-1]):550                hi = len(ls)551                lo = i + 1552                if ls[lo].start >= ex.end:553                    continue554                while lo + 1 < hi:555                    mid = (lo + hi) // 2556                    if ls[mid].start >= ex.end:557                        hi = mid558                    else:559                        lo = mid560                id1 = self.stable_identifier_for_example(ex)561                result.extend(562                    [563                        (id1, self.stable_identifier_for_example(ls[j]))564                        for j in hrange(i + 1, hi)565                    ]566                )567        return result568    @defines_shrink_pass(calculate_descents)569    def pass_to_descendant(self, ancestor_id, descendant_id):570        """Attempt to replace each example with a descendant example.571        This is designed to deal with strategies that call themselves572        recursively. For example, suppose we had:573        binary_tree = st.deferred(574            lambda: st.one_of(575                st.integers(), st.tuples(binary_tree, binary_tree)))576        This pass guarantees that we can replace any binary tree with one of577        its subtrees - each of those will create an interval that the parent578        could validly be replaced with, and this pass will try doing that.579        This is pretty expensive - it takes O(len(intervals)^2) - so we run it580        late in the process when we've got the number of intervals as far down581        as possible.582        """583        ancestor = self.example_for_stable_identifier(ancestor_id)584        descendant = self.example_for_stable_identifier(descendant_id)585        if descendant is None:586            return587        assert ancestor is not None588        self.incorporate_new_buffer(589            self.buffer[: ancestor.start]590            + self.buffer[descendant.start : descendant.end]591            + self.buffer[ancestor.end :]592        )593    def is_shrinking_block(self, i):594        """Checks whether block i has been previously marked as a shrinking595        block.596        If the shrink target has changed since i was last checked, will597        attempt to calculate if an equivalent block in a previous shrink598        target was marked as shrinking.599        """600        if not self.__shrinking_prefixes:601            return False602        try:603            return self.__shrinking_block_cache[i]604        except KeyError:605            pass606        t = self.shrink_target607        return self.__shrinking_block_cache.setdefault(608            i, t.buffer[: t.blocks[i].start] in self.__shrinking_prefixes609        )610    def lower_common_block_offset(self):611        """Sometimes we find ourselves in a situation where changes to one part612        of the byte stream unlock changes to other parts. Sometimes this is613        good, but sometimes this can cause us to exhibit exponential slow614        downs!615        e.g. suppose we had the following:616        m = draw(integers(min_value=0))617        n = draw(integers(min_value=0))618        assert abs(m - n) > 1619        If this fails then we'll end up with a loop where on each iteration we620        reduce each of m and n by 2 - m can't go lower because of n, then n621        can't go lower because of m.622        This will take us O(m) iterations to complete, which is exponential in623        the data size, as we gradually zig zag our way towards zero.624        This can only happen if we're failing to reduce the size of the byte625        stream: The number of iterations that reduce the length of the byte626        stream is bounded by that length.627        So what we do is this: We keep track of which blocks are changing, and628        then if there's some non-zero common offset to them we try and minimize629        them all at once by lowering that offset.630        This may not work, and it definitely won't get us out of all possible631        exponential slow downs (an example of where it doesn't is where the632        shape of the blocks changes as a result of this bouncing behaviour),633        but it fails fast when it doesn't work and gets us out of a really634        nastily slow case when it does.635        """636        if len(self.__changed_blocks) <= 1:637            return638        current = self.shrink_target639        blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]640        changed = [641            i642            for i in sorted(self.__changed_blocks)643            if not self.shrink_target.blocks[i].trivial644        ]645        if not changed:646            return647        ints = [int_from_bytes(blocked[i]) for i in changed]648        offset = min(ints)649        assert offset > 0650        for i in hrange(len(ints)):651            ints[i] -= offset652        def reoffset(o):653            new_blocks = list(blocked)654            for i, v in zip(changed, ints):655                new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))656            return self.incorporate_new_buffer(hbytes().join(new_blocks))657        Integer.shrink(offset, reoffset, random=self.random)658        self.clear_change_tracking()659    def mark_shrinking(self, blocks):660        """Mark each of these blocks as a shrinking block: That is, lowering661        its value lexicographically may cause less data to be drawn after."""662        t = self.shrink_target663        for i in blocks:664            if self.__shrinking_block_cache.get(i) is True:665                continue666            self.__shrinking_block_cache[i] = True667            prefix = t.buffer[: t.blocks[i].start]668            self.__shrinking_prefixes.add(prefix)669    def clear_change_tracking(self):670        self.__last_checked_changed_at = self.shrink_target671        self.__all_changed_blocks = set()672    def mark_changed(self, i):673        self.__changed_blocks.add(i)674    @property675    def __changed_blocks(self):676        if self.__last_checked_changed_at is not self.shrink_target:677            prev_target = self.__last_checked_changed_at678            new_target = self.shrink_target679            assert prev_target is not new_target680            prev = prev_target.buffer681            new = new_target.buffer682            assert sort_key(new) < sort_key(prev)683            if (684                len(new_target.blocks) != len(prev_target.blocks)685                or new_target.blocks.endpoints != prev_target.blocks.endpoints686            ):687                self.__all_changed_blocks = set()688            else:689                blocks = new_target.blocks690                # Index of last block whose contents have been modified, found691                # by checking if the tail past this point has been modified.692                last_changed = binary_search(693                    0,694                    len(blocks),695                    lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],696                )697                # Index of the first block whose contents have been changed,698                # because we know that this predicate is true for zero (because699                # the prefix from the start is empty), so the result must be True700                # for the bytes from the start of this block and False for the701                # bytes from the end, hence the change is in this block.702                first_changed = binary_search(703                    0,704                    len(blocks),705                    lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],706                )707                # Between these two changed regions we now do a linear scan to708                # check if any specific block values have changed.709                for i in hrange(first_changed, last_changed + 1):710                    u, v = blocks.bounds(i)711                    if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:712                        self.__all_changed_blocks.add(i)713            self.__last_checked_changed_at = new_target714        assert self.__last_checked_changed_at is self.shrink_target715        return self.__all_changed_blocks716    def update_shrink_target(self, new_target):717        assert isinstance(new_target, ConjectureResult)718        if self.shrink_target is not None:719            self.shrinks += 1720        else:721            self.__all_changed_blocks = set()722            self.__last_checked_changed_at = new_target723        self.shrink_target = new_target724        self.__shrinking_block_cache = {}725        self.__derived_values = {}726    def try_shrinking_blocks(self, blocks, b):727        """Attempts to replace each block in the blocks list with b. Returns728        True if it succeeded (which may include some additional modifications729        to shrink_target).730        May call mark_shrinking with b if this causes a reduction in size.731        In current usage it is expected that each of the blocks currently have732        the same value, although this is not essential. Note that b must be733        < the block at min(blocks) or this is not a valid shrink.734        This method will attempt to do some small amount of work to delete data735        that occurs after the end of the blocks. This is useful for cases where736        there is some size dependency on the value of a block.737        """738        initial_attempt = bytearray(self.shrink_target.buffer)739        for i, block in enumerate(blocks):740            if block >= len(self.blocks):741                blocks = blocks[:i]742                break743            u, v = self.blocks[block].bounds744            n = min(self.blocks[block].length, len(b))745            initial_attempt[v - n : v] = b[-n:]746        if not blocks:747            return False748        start = self.shrink_target.blocks[blocks[0]].start749        end = self.shrink_target.blocks[blocks[-1]].end750        initial_data = self.cached_test_function(initial_attempt)751        if initial_data.status == Status.INTERESTING:752            self.lower_common_block_offset()753            return initial_data is self.shrink_target754        # If this produced something completely invalid we ditch it755        # here rather than trying to persevere.756        if initial_data.status < Status.VALID:757            return False758        # We've shrunk inside our group of blocks, so we have no way to759        # continue. (This only happens when shrinking more than one block at760        # a time).761        if len(initial_data.buffer) < v:762            return False763        lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)764        # If this did not in fact cause the data size to shrink we765        # bail here because it's not worth trying to delete stuff from766        # the remainder.767        if lost_data <= 0:768            return False769        self.mark_shrinking(blocks)770        # We now look for contiguous regions to delete that might help fix up771        # this failed shrink. We only look for contiguous regions of the right772        # lengths because doing anything more than that starts to get very773        # expensive. See example_deletion_with_block_lowering for where we774        # try to be more aggressive.775        regions_to_delete = {(end, end + lost_data)}776        for j in (blocks[-1] + 1, blocks[-1] + 2):777            if j >= min(len(initial_data.blocks), len(self.blocks)):778                continue779            # We look for a block very shortly after the last one that has780            # lost some of its size, and try to delete from the beginning so781            # that it retains the same integer value. This is a bit of a hyper782            # specific trick designed to make our integers() strategy shrink783            # well.784            r1, s1 = self.shrink_target.blocks[j].bounds785            r2, s2 = initial_data.blocks[j].bounds786            lost = (s1 - r1) - (s2 - r2)787            # Apparently a coverage bug? An assert False in the body of this788            # will reliably fail, but it shows up as uncovered.789            if lost <= 0 or r1 != r2:  # pragma: no cover790                continue791            regions_to_delete.add((r1, r1 + lost))792        for ex in self.shrink_target.examples:793            if ex.start > start:794                continue795            if ex.end <= end:796                continue797            replacement = initial_data.examples[ex.index]798            in_original = [c for c in ex.children if c.start >= end]799            in_replaced = [c for c in replacement.children if c.start >= end]800            if len(in_replaced) >= len(in_original) or not in_replaced:801                continue802            # We've found an example where some of the children went missing803            # as a result of this change, and just replacing it with the data804            # it would have had and removing the spillover didn't work. This805            # means that some of its children towards the right must be806            # important, so we try to arrange it so that it retains its807            # rightmost children instead of its leftmost.808            regions_to_delete.add(809                (in_original[0].start, in_original[-len(in_replaced)].start)810            )811        for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):812            try_with_deleted = bytearray(initial_attempt)813            del try_with_deleted[u:v]814            if self.incorporate_new_buffer(try_with_deleted):815                return True816        return False817    def remove_discarded(self):818        """Try removing all bytes marked as discarded.819        This is primarily to deal with data that has been ignored while820        doing rejection sampling - e.g. as a result of an integer range, or a821        filtered strategy.822        Such data will also be handled by the adaptive_example_deletion pass,823        but that pass is necessarily more conservative and will try deleting824        each interval individually. The common case is that all data drawn and825        rejected can just be thrown away immediately in one block, so this pass826        will be much faster than trying each one individually when it works.827        returns False if there is discarded data and removing it does not work,828        otherwise returns True.829        """830        while self.shrink_target.has_discards:831            discarded = []832            for ex in self.shrink_target.examples:833                if (834                    ex.length > 0835                    and ex.discarded836                    and (not discarded or ex.start >= discarded[-1][-1])837                ):838                    discarded.append((ex.start, ex.end))839            # This can happen if we have discards but they are all of840            # zero length. This shouldn't happen very often so it's841            # faster to check for it here than at the point of example842            # generation.843            if not discarded:844                break845            attempt = bytearray(self.shrink_target.buffer)846            for u, v in reversed(discarded):847                del attempt[u:v]848            if not self.incorporate_new_buffer(attempt):849                return False850        return True851    @derived_value852    def stable_identifier_arguments(self):853        return [(self.stable_identifier_for_example(ex),) for ex in self.examples]854    @defines_shrink_pass(lambda self: self.stable_identifier_arguments)855    def adaptive_example_deletion(self, example_id):856        """Attempts to delete every example from the test case.857        That is, it is logically equivalent to trying ``self.buffer[:ex.start] +858        self.buffer[ex.end:]`` for every example ``ex``. The order in which859        examples are tried is randomized, and when deletion is successful it860        will attempt to adapt to delete more than one example at a time.861        """862        example = self.example_for_stable_identifier(example_id)863        if example is None or not self.incorporate_new_buffer(864            self.buffer[: example.start] + self.buffer[example.end :]865        ):866            return867        # If we successfully deleted one example there may be a useful868        # deletable region around here.869        original = self.shrink_target870        endpoints = set()871        for ex in original.examples:872            if ex.depth <= example.depth:873                endpoints.add(ex.start)874                endpoints.add(ex.end)875        partition = sorted(endpoints)876        j = partition.index(example.start)877        def delete_region(a, b):878            assert a <= j <= b879            if a < 0 or b >= len(partition) - 1:880                return False881            return self.consider_new_buffer(882                original.buffer[: partition[a]] + original.buffer[partition[b] :]883            )884        to_right = find_integer(lambda n: delete_region(j, j + n))885        if to_right > 0:886            find_integer(lambda n: delete_region(j - n, j + to_right))887    def try_zero_example(self, ex):888        u = ex.start889        v = ex.end890        attempt = self.cached_test_function(891            self.buffer[:u] + hbytes(v - u) + self.buffer[v:]892        )893        if attempt is Overrun:894            return False895        in_replacement = attempt.examples[ex.index]896        used = in_replacement.length897        if attempt is not self.shrink_target:898            if in_replacement.end < len(attempt.buffer) and used < ex.length:899                self.incorporate_new_buffer(900                    self.buffer[:u] + hbytes(used) + self.buffer[v:]901                )902        return self.examples[ex.index].trivial903    @defines_shrink_pass(lambda self: self.stable_identifier_arguments)904    def zero_examples(self, ex_id):905        """Attempt to replace each example with a minimal version of itself."""906        ex = self.example_for_stable_identifier(ex_id)907        """Attempt to replace each example with a minimal version of itself."""908        # If the example is already trivial, assume there's nothing to do here.909        # We could attempt to use it as an adaptive replacement for other910        # similar examples, but that seems to be ineffective, resulting mostly911        # in redundant work rather than helping.912        if ex is None or ex.trivial:913            return914        if not self.try_zero_example(ex):915            return916        # If we zeroed the example we need to get the new one that replaced it.917        ex = self.examples[ex.index]918        original = self.shrink_target919        group = self.examples_by_label[ex.label]920        i = group.index(ex)921        replacement = self.buffer[ex.start : ex.end]922        # We first expand to cover the trivial region surrounding this group.923        # This avoids a situation where the adaptive phase "succeeds" a lot by924        # virtue of not doing anything and then goes into a galloping phase925        # where it does a bunch of useless work.926        def all_trivial(a, b):927            if a < 0 or b > len(group):928                return False929            return all(e.trivial for e in group[a:b])930        start, end = expand_region(all_trivial, i, i + 1)931        # If we've got multiple trivial examples of different lengths then932        # this isn't going to work as a replacement for all of them and so we933        # skip out early.934        if any(e.length != len(replacement) for e in group[start:end]):935            return936        def can_zero(a, b):937            if a < 0 or b > len(group):938                return False939            regions = []940            for e in group[a:b]:941                t = (e.start, e.end, replacement)942                if not regions or t[0] >= regions[-1][1]:943                    regions.append(t)944            return self.consider_new_buffer(replace_all(original.buffer, regions))945        expand_region(can_zero, start, end)946    @derived_value947    def blocks_by_non_zero_suffix(self):948        """Returns a list of blocks grouped by their non-zero suffix,949        as a list of (suffix, indices) pairs, skipping all groupings950        where there is only one index.951        This is only used for the arguments of minimize_duplicated_blocks.952        """953        duplicates = defaultdict(list)954        for block in self.blocks:955            duplicates[non_zero_suffix(self.buffer[block.start : block.end])].append(956                block.index957            )958        return duplicates959    @defines_shrink_pass(960        lambda self: [(b,) for b in sorted(self.blocks_by_non_zero_suffix)]961    )962    def minimize_duplicated_blocks(self, block):963        """Find blocks that have been duplicated in multiple places and attempt964        to minimize all of the duplicates simultaneously.965        This lets us handle cases where two values can't be shrunk966        independently of each other but can easily be shrunk together.967        For example if we had something like:968        ls = data.draw(lists(integers()))969        y = data.draw(integers())970        assert y not in ls971        Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were972        to replace both 3s with 0, this would be a valid shrink, but if we were973        to replace either 3 with 0 on its own the test would start passing.974        It is also useful for when that duplication is accidental and the value975        of the blocks doesn't matter very much because it allows us to replace976        more values at once.977        """978        targets = self.blocks_by_non_zero_suffix[block]979        if len(targets) <= 1:980            return981        Lexical.shrink(982            block,983            lambda b: self.try_shrinking_blocks(targets, b),984            random=self.random,985            full=False,986        )987    @defines_shrink_pass(lambda self: self.stable_identifier_arguments)988    def minimize_floats(self, ex_id):989        """Some shrinks that we employ that only really make sense for our990        specific floating point encoding that are hard to discover from any991        sort of reasonable general principle. This allows us to make992        transformations like replacing a NaN with an Infinity or replacing993        a float with its nearest integers that we would otherwise not be994        able to due to them requiring very specific transformations of995        the bit sequence.996        We only apply these transformations to blocks that "look like" our997        standard float encodings because they are only really meaningful998        there. The logic for detecting this is reasonably precise, but999        it doesn't matter if it's wrong. These are always valid1000        transformations to make, they just don't necessarily correspond to1001        anything particularly meaningful for non-float values.1002        """1003        ex = self.example_for_stable_identifier(ex_id)1004        if ex is None or not (1005            ex.label == DRAW_FLOAT_LABEL1006            and len(ex.children) == 21007            and ex.children[0].length == 81008        ):1009            return1010        u = ex.children[0].start1011        v = ex.children[0].end1012        buf = self.shrink_target.buffer1013        b = buf[u:v]1014        f = lex_to_float(int_from_bytes(b))1015        b2 = int_to_bytes(float_to_lex(f), 8)1016        if b == b2 or self.consider_new_buffer(buf[:u] + b2 + buf[v:]):1017            Float.shrink(1018                f,1019                lambda x: self.consider_new_buffer(1020                    self.shrink_target.buffer[:u]1021                    + int_to_bytes(float_to_lex(x), 8)1022                    + self.shrink_target.buffer[v:]1023                ),1024                random=self.random,1025            )1026    @defines_shrink_pass(lambda self: [(i,) for i in hrange(len(self.blocks))])1027    def minimize_individual_blocks(self, i):1028        """Attempt to minimize each block in sequence.1029        This is the pass that ensures that e.g. each integer we draw is a1030        minimum value. So it's the part that guarantees that if we e.g. do1031        x = data.draw(integers())1032        assert x < 101033        then in our shrunk example, x = 10 rather than say 97.1034        """1035        if i >= len(self.blocks):1036            return1037        block = self.blocks[i]1038        u, v = block.bounds1039        i = block.index1040        Lexical.shrink(1041            self.shrink_target.buffer[u:v],1042            lambda b: self.try_shrinking_blocks((i,), b),1043            random=self.random,1044            full=False,1045        )1046    @defines_shrink_pass(1047        lambda self: [1048            (i, self.stable_identifier_for_example(ex))1049            for i in hrange(len(self.blocks))1050            if self.is_shrinking_block(i)1051            for ex in self.examples1052            if ex.length > 0 and ex.start >= self.blocks[i].end1053        ]1054    )1055    def example_deletion_with_block_lowering(self, block_i, ex_id):1056        """Sometimes we get stuck where there is data that we could easily1057        delete, but it changes the number of examples generated, so we have to1058        change that at the same time.1059        We handle most of the common cases in try_shrinking_blocks which is1060        pretty good at clearing out large contiguous blocks of dead space,1061        but it fails when there is data that has to stay in particular places1062        in the list.1063        This pass exists as an emergency procedure to get us unstuck. For every1064        example and every block not inside that example it tries deleting the1065        example and modifying the block's value by one in either direction.1066        """1067        if block_i >= len(self.blocks):1068            return1069        block = self.blocks[block_i]1070        ex = self.example_for_stable_identifier(ex_id)1071        if ex is None:1072            return1073        u, v = block.bounds1074        n = int_from_bytes(self.shrink_target.buffer[u:v])1075        if n == 0:1076            return1077        buf = bytearray(self.shrink_target.buffer)1078        buf[u:v] = int_to_bytes(n - 1, v - u)1079        del buf[ex.start : ex.end]1080        self.incorporate_new_buffer(buf)1081    @defines_shrink_pass(1082        lambda self: [1083            (self.stable_identifier_for_example(e), l)1084            for e in self.examples1085            for l in sorted({c.label for c in e.children}, key=str)1086        ]1087    )1088    def reorder_examples(self, ex_id, label):1089        """This pass allows us to reorder the children of each example.1090        For example, consider the following:1091        .. code-block:: python1092            import hypothesis.strategies as st1093            from hypothesis import given1094            @given(st.text(), st.text())1095            def test_not_equal(x, y):1096                assert x != y1097        Without the ability to reorder x and y this could fail either with1098        ``x=""``, ``y="0"``, or the other way around. With reordering it will1099        reliably fail with ``x=""``, ``y="0"``.1100        """1101        ex = self.example_for_stable_identifier(ex_id)1102        if ex is None:1103            return1104        group = [c for c in ex.children if c.label == label]1105        if len(group) <= 1:1106            return1107        st = self.shrink_target1108        pieces = [st.buffer[ex.start : ex.end] for ex in group]1109        endpoints = [(ex.start, ex.end) for ex in group]1110        Ordering.shrink(1111            pieces,1112            lambda ls: self.consider_new_buffer(1113                replace_all(st.buffer, [(u, v, r) for (u, v), r in zip(endpoints, ls)])1114            ),1115            random=self.random,1116        )1117    @defines_shrink_pass(lambda self: [(c,) for c in hrange(256)])1118    def alphabet_minimize(self, c):1119        """Attempts to minimize the "alphabet" - the set of bytes that1120        are used in the representation of the current buffer. The main1121        benefit of this is that it significantly increases our cache hit rate1122        by making things that are equivalent more likely to have the same1123        representation, but it's also generally a rather effective "fuzzing"1124        step that gives us a lot of good opportunities to slip to a smaller1125        representation of the same bug.1126        """1127        buf = self.buffer1128        if c not in buf:1129            return1130        def can_replace_with(d):1131            if d < 0:1132                return False1133            if self.consider_new_buffer(hbytes([d if b == c else b for b in buf])):1134                if d <= 1:1135                    # For small values of d if this succeeds we take this1136                    # as evidence that it is worth doing a a bulk replacement1137                    # where we replace all values which are close1138                    # to c but smaller with d as well. This helps us substantially1139                    # in cases where we have a lot of "dead" bytes that don't really do1140                    # much, as it allows us to replace many of them in one go rather1141                    # than one at a time. An example of where this matters is1142                    # test_minimize_multiple_elements_in_silly_large_int_range_min_is_not_dupe1143                    # in test_shrink_quality.py1144                    def replace_range(k):1145                        if k > c:1146                            return False1147                        def should_replace_byte(b):1148                            return c - k <= b <= c and d < b1149                        return self.consider_new_buffer(1150                            hbytes([d if should_replace_byte(b) else b for b in buf])1151                        )1152                    find_integer(replace_range)1153                return True1154        if (1155            # If we cannot replace the current byte with its predecessor,1156            # assume it is already minimal and continue on. This ensures1157            # we make no more than one call per distinct byte value in the1158            # event that no shrinks are possible here.1159            not can_replace_with(c - 1)1160            # We next try replacing with 0 or 1. If this works then1161            # there is nothing else to do here.1162            or can_replace_with(0)1163            or can_replace_with(1)1164            # Finally we try to replace with c - 2 before going on to the1165            # binary search so that in cases which were already nearly1166            # minimal we don't do log(n) extra work.1167            or not can_replace_with(c - 2)1168        ):1169            return1170        # Now binary search to find a small replacement.1171        # Invariant: We cannot replace with lo, we can replace with hi.1172        lo = 11173        hi = c - 21174        while lo + 1 < hi:1175            mid = (lo + hi) // 21176            if can_replace_with(mid):1177                hi = mid1178            else:1179                lo = mid1180    def stable_identifier_for_example(self, example):1181        """A stable identifier is one that we can reasonably reliably1182        count on referring to "logically the same" example between two1183        different test runs. It is currently represented as a path from1184        the root."""1185        return example.index1186    def example_for_stable_identifier(self, identifier):1187        """Returns the example in the current shrink target corresponding1188        to this stable identifier, or None if no such example exists."""1189        if identifier >= len(self.examples):1190            return None1191        return self.shrink_target.examples[identifier]1192    def run_block_program(self, i, description, original, repeats=1):1193        """Block programs are a mini-DSL for block rewriting, defined as a sequence1194        of commands that can be run at some index into the blocks1195        Commands are:1196            * "-", subtract one from this block.1197            * "X", delete this block1198        If a command does not apply (currently only because it's - on a zero1199        block) the block will be silently skipped over.1200        This method runs the block program in ``description`` at block index1201        ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it1202        will attempt to approximate the results of running it that many times.1203        Returns True if this successfully changes the underlying shrink target,1204        else False.1205        """1206        if i + len(description) > len(original.blocks) or i < 0:1207            return False1208        attempt = bytearray(original.buffer)1209        for _ in hrange(repeats):1210            for k, d in reversed(list(enumerate(description))):1211                j = i + k1212                u, v = original.blocks[j].bounds1213                if v > len(attempt):1214                    return False1215                if d == "-":1216                    value = int_from_bytes(attempt[u:v])1217                    if value == 0:1218                        return False1219                    else:1220                        attempt[u:v] = int_to_bytes(value - 1, v - u)1221                elif d == "X":1222                    del attempt[u:v]1223                else:  # pragma: no cover1224                    assert False, "Unrecognised command %r" % (d,)1225        return self.incorporate_new_buffer(attempt)1226def block_program(description):1227    """Mini-DSL for block rewriting. A sequence of commands that will be run1228    over all contiguous sequences of blocks of the description length in order.1229    Commands are:1230        * ".", keep this block unchanged1231        * "-", subtract one from this block.1232        * "0", replace this block with zero1233        * "X", delete this block1234    If a command does not apply (currently only because it's - on a zero1235    block) the block will be silently skipped over. As a side effect of1236    running a block program its score will be updated.1237    """1238    name = "block_program(%r)" % (description,)1239    if name not in SHRINK_PASS_DEFINITIONS:1240        """Defines a shrink pass that runs the block program ``description``1241        at every block index."""1242        n = len(description)1243        def generate_arguments(self):1244            return [(i,) for i in hrange(len(self.blocks))]1245        def run(self, i):1246            """Adaptively attempt to run the block program at the current1247            index. If this successfully applies the block program ``k`` times1248            then this runs in ``O(log(k))`` test function calls."""1249            if i + n >= len(self.shrink_target.blocks):1250                return1251            # First, run the block program at the chosen index. If this fails,1252            # don't do any extra work, so that failure is as cheap as possible.1253            if not self.run_block_program(i, description, original=self.shrink_target):1254                return1255            # Because we run in a random order we will often find ourselves in the middle1256            # of a region where we could run the block program. We thus start by moving1257            # left to the beginning of that region if possible in order to to start from1258            # the beginning of that region.1259            def offset_left(k):1260                return i - k * n1261            i = offset_left(1262                find_integer(1263                    lambda k: self.run_block_program(1264                        offset_left(k), description, original=self.shrink_target1265                    )1266                )1267            )1268            original = self.shrink_target1269            # Now try to run the block program multiple times here.1270            find_integer(1271                lambda k: self.run_block_program(1272                    i, description, original=original, repeats=k1273                )1274            )1275        run.__name__ = name1276        defines_shrink_pass(generate_arguments)(run)1277        assert name in SHRINK_PASS_DEFINITIONS1278    return name1279@attr.s(slots=True, cmp=False)1280class ShrinkPass(object):1281    run_with_arguments = attr.ib()1282    generate_arguments = attr.ib()1283    index = attr.ib()1284    shrinker = attr.ib()1285    fixed_point_at = attr.ib(default=None)1286    successes = attr.ib(default=0)1287    runs = attr.ib(default=0)1288    calls = attr.ib(default=0)1289    shrinks = attr.ib(default=0)1290    deletions = attr.ib(default=0)...test_shrinker.py
Source:test_shrinker.py  
...374    @shrinking_from([255])375    def shrinker(data):376        data.draw_bits(8)377        data.mark_interesting()378    sp = shrinker.shrink_pass(block_program("X"))379    assert isinstance(sp, ShrinkPass)380    assert shrinker.shrink_pass(sp) is sp381def test_will_terminate_stalled_shrinks():382    # Suppress the time based slow shrinking check - we only want383    # the one that checks if we're in a stall where we've shrunk384    # as far as we're going to get.385    time.freeze()386    @shrinking_from([255] * 100)387    def shrinker(data):388        count = 0389        for _ in range(100):390            if data.draw_bits(8) != 255:391                count += 1392                if count >= 10:393                    return394        data.mark_interesting()395    shrinker.shrink()396    assert shrinker.calls <= 1 + 2 * shrinker.max_stall397def test_will_let_fixate_shrink_passes_do_a_full_run_through():398    @shrinking_from(range(50))399    def shrinker(data):400        for i in range(50):401            if data.draw_bits(8) != i:402                data.mark_invalid()403        data.mark_interesting()404    shrinker.max_stall = 5405    passes = [block_program("X" * i) for i in range(1, 11)]406    with pytest.raises(StopShrinking):407        shrinker.fixate_shrink_passes(passes)408    assert shrinker.shrink_pass(passes[-1]).calls > 0409@pytest.mark.parametrize("n_gap", [0, 1, 2, 3])410def test_can_simultaneously_lower_non_duplicated_nearby_blocks(n_gap):411    @shrinking_from([1, 1] + [0] * n_gap + [0, 2])412    def shrinker(data):413        # Block off lowering the whole buffer414        if data.draw_bits(1) == 0:415            data.mark_invalid()416        m = data.draw_bits(8)417        for _ in range(n_gap):418            data.draw_bits(8)419        n = data.draw_bits(16)420        if n == m + 1:421            data.mark_interesting()422    shrinker.fixate_shrink_passes(["lower_blocks_together"])...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
