Best Python code snippet using hypothesis
shrinker.py
Source:shrinker.py  
...232        self.initial_size = len(initial.buffer)233        # We keep track of the current best example on the shrink_target234        # attribute.235        self.shrink_target = None236        self.update_shrink_target(initial)237        self.shrinks = 0238        # We terminate shrinks that seem to have reached their logical239        # conclusion: If we've called the underlying test function at240        # least self.max_stall times since the last time we shrunk,241        # it's time to stop shrinking.242        self.max_stall = 200243        self.initial_calls = self.engine.call_count244        self.calls_at_last_shrink = self.initial_calls245        self.passes_by_name = {}246        self.passes = []247        # Extra DFAs that may be installed. This is used solely for248        # testing and learning purposes.249        self.extra_dfas = {}250    @derived_value251    def cached_calculations(self):252        return {}253    def cached(self, *keys):254        def accept(f):255            cache_key = (f.__name__,) + keys256            try:257                return self.cached_calculations[cache_key]258            except KeyError:259                return self.cached_calculations.setdefault(cache_key, f())260        return accept261    def add_new_pass(self, run):262        """Creates a shrink pass corresponding to calling ``run(self)``"""263        definition = SHRINK_PASS_DEFINITIONS[run]264        p = ShrinkPass(265            run_with_chooser=definition.run_with_chooser,266            shrinker=self,267            index=len(self.passes),268        )269        self.passes.append(p)270        self.passes_by_name[p.name] = p271        return p272    def shrink_pass(self, name):273        """Return the ShrinkPass object for the pass with the given name."""274        if isinstance(name, ShrinkPass):275            return name276        if name not in self.passes_by_name:277            self.add_new_pass(name)278        return self.passes_by_name[name]279    @derived_value280    def match_cache(self):281        return {}282    def matching_regions(self, dfa):283        """Returns all pairs (u, v) such that self.buffer[u:v] is accepted284        by this DFA."""285        try:286            return self.match_cache[dfa]287        except KeyError:288            pass289        results = dfa.all_matching_regions(self.buffer)290        results.sort(key=lambda t: (t[1] - t[0], t[1]))291        assert all(dfa.matches(self.buffer[u:v]) for u, v in results)292        self.match_cache[dfa] = results293        return results294    @property295    def calls(self):296        """Return the number of calls that have been made to the underlying297        test function."""298        return self.engine.call_count299    def consider_new_buffer(self, buffer):300        """Returns True if after running this buffer the result would be301        the current shrink_target."""302        buffer = bytes(buffer)303        return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)304    def incorporate_new_buffer(self, buffer):305        """Either runs the test function on this buffer and returns True if306        that changed the shrink_target, or determines that doing so would307        be useless and returns False without running it."""308        buffer = bytes(buffer[: self.shrink_target.index])309        # Sometimes an attempt at lexicographic minimization will do the wrong310        # thing because the buffer has changed under it (e.g. something has311        # turned into a write, the bit size has changed). The result would be312        # an invalid string, but it's better for us to just ignore it here as313        # it turns out to involve quite a lot of tricky book-keeping to get314        # this right and it's better to just handle it in one place.315        if sort_key(buffer) >= sort_key(self.shrink_target.buffer):316            return False317        if self.shrink_target.buffer.startswith(buffer):318            return False319        previous = self.shrink_target320        self.cached_test_function(buffer)321        return previous is not self.shrink_target322    def incorporate_test_data(self, data):323        """Takes a ConjectureData or Overrun object updates the current324        shrink_target if this data represents an improvement over it."""325        if data.status < Status.VALID or data is self.shrink_target:326            return327        if (328            self.__predicate(data)329            and sort_key(data.buffer) < sort_key(self.shrink_target.buffer)330            and self.__allow_transition(self.shrink_target, data)331        ):332            self.update_shrink_target(data)333    def cached_test_function(self, buffer):334        """Returns a cached version of the underlying test function, so335        that the result is either an Overrun object (if the buffer is336        too short to be a valid test case) or a ConjectureData object337        with status >= INVALID that would result from running this buffer."""338        buffer = bytes(buffer)339        result = self.engine.cached_test_function(buffer)340        self.incorporate_test_data(result)341        if self.calls - self.calls_at_last_shrink >= self.max_stall:342            raise StopShrinking()343        return result344    def debug(self, msg):345        self.engine.debug(msg)346    @property347    def random(self):348        return self.engine.random349    def shrink(self):350        """Run the full set of shrinks and update shrink_target.351        This method is "mostly idempotent" - calling it twice is unlikely to352        have any effect, though it has a non-zero probability of doing so.353        """354        # We assume that if an all-zero block of bytes is an interesting355        # example then we're not going to do better than that.356        # This might not technically be true: e.g. for integers() | booleans()357        # the simplest example is actually [1, 0]. Missing this case is fairly358        # harmless and this allows us to make various simplifying assumptions359        # about the structure of the data (principally that we're never360        # operating on a block of all zero bytes so can use non-zeroness as a361        # signpost of complexity).362        if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(363            bytes(len(self.shrink_target.buffer))364        ):365            return366        try:367            self.greedy_shrink()368        except StopShrinking:369            pass370        finally:371            if self.engine.report_debug_info:372                def s(n):373                    return "s" if n != 1 else ""374                total_deleted = self.initial_size - len(self.shrink_target.buffer)375                self.debug("---------------------")376                self.debug("Shrink pass profiling")377                self.debug("---------------------")378                self.debug("")379                calls = self.engine.call_count - self.initial_calls380                self.debug(381                    (382                        "Shrinking made a total of %d call%s "383                        "of which %d shrank. This deleted %d byte%s out of %d."384                    )385                    % (386                        calls,387                        s(calls),388                        self.shrinks,389                        total_deleted,390                        s(total_deleted),391                        self.initial_size,392                    )393                )394                for useful in [True, False]:395                    self.debug("")396                    if useful:397                        self.debug("Useful passes:")398                    else:399                        self.debug("Useless passes:")400                    self.debug("")401                    for p in sorted(402                        self.passes, key=lambda t: (-t.calls, t.deletions, t.shrinks)403                    ):404                        if p.calls == 0:405                            continue406                        if (p.shrinks != 0) != useful:407                            continue408                        self.debug(409                            (410                                "  * %s made %d call%s of which "411                                "%d shrank, deleting %d byte%s."412                            )413                            % (414                                p.name,415                                p.calls,416                                s(p.calls),417                                p.shrinks,418                                p.deletions,419                                s(p.deletions),420                            )421                        )422                self.debug("")423    def greedy_shrink(self):424        """Run a full set of greedy shrinks (that is, ones that will only ever425        move to a better target) and update shrink_target appropriately.426        This method iterates to a fixed point and so is idempontent - calling427        it twice will have exactly the same effect as calling it once.428        """429        self.fixate_shrink_passes(430            [431                block_program("X" * 5),432                block_program("X" * 4),433                block_program("X" * 3),434                block_program("X" * 2),435                block_program("X" * 1),436                "pass_to_descendant",437                "reorder_examples",438                "minimize_floats",439                "minimize_duplicated_blocks",440                block_program("-XX"),441                "minimize_individual_blocks",442                block_program("--X"),443                "redistribute_block_pairs",444                "lower_blocks_together",445            ]446            + [dfa_replacement(n) for n in SHRINKING_DFAS]447        )448    @derived_value449    def shrink_pass_choice_trees(self):450        return defaultdict(ChoiceTree)451    def fixate_shrink_passes(self, passes):452        """Run steps from each pass in ``passes`` until the current shrink target453        is a fixed point of all of them."""454        passes = list(map(self.shrink_pass, passes))455        any_ran = True456        while any_ran:457            any_ran = False458            reordering = {}459            # We run remove_discarded after every pass to do cleanup460            # keeping track of whether that actually works. Either there is461            # no discarded data and it is basically free, or it reliably works462            # and deletes data, or it doesn't work. In that latter case we turn463            # it off for the rest of this loop through the passes, but will464            # try again once all of the passes have been run.465            can_discard = self.remove_discarded()466            calls_at_loop_start = self.calls467            # We keep track of how many calls can be made by a single step468            # without making progress and use this to test how much to pad469            # out self.max_stall by as we go along.470            max_calls_per_failing_step = 1471            for sp in passes:472                if can_discard:473                    can_discard = self.remove_discarded()474                before_sp = self.shrink_target475                # Run the shrink pass until it fails to make any progress476                # max_failures times in a row. This implicitly boosts shrink477                # passes that are more likely to work.478                failures = 0479                max_failures = 20480                while failures < max_failures:481                    # We don't allow more than max_stall consecutive failures482                    # to shrink, but this means that if we're unlucky and the483                    # shrink passes are in a bad order where only the ones at484                    # the end are useful, if we're not careful this heuristic485                    # might stop us before we've tried everything. In order to486                    # avoid that happening, we make sure that there's always487                    # plenty of breathing room to make it through a single488                    # iteration of the fixate_shrink_passes loop.489                    self.max_stall = max(490                        self.max_stall,491                        2 * max_calls_per_failing_step492                        + (self.calls - calls_at_loop_start),493                    )494                    prev = self.shrink_target495                    initial_calls = self.calls496                    # It's better for us to run shrink passes in a deterministic497                    # order, to avoid repeat work, but this can cause us to create498                    # long stalls when there are a lot of steps which fail to do499                    # anything useful. In order to avoid this, once we've noticed500                    # we're in a stall (i.e. half of max_failures calls have failed501                    # to do anything) we switch to randomly jumping around. If we502                    # find a success then we'll resume deterministic order from503                    # there which, with any luck, is in a new good region.504                    if not sp.step(random_order=failures >= max_failures // 2):505                        # step returns False when there is nothing to do because506                        # the entire choice tree is exhausted. If this happens507                        # we break because we literally can't run this pass any508                        # more than we already have until something else makes509                        # progress.510                        break511                    any_ran = True512                    # Don't count steps that didn't actually try to do513                    # anything as failures. Otherwise, this call is a failure514                    # if it failed to make any changes to the shrink target.515                    if initial_calls != self.calls:516                        if prev is not self.shrink_target:517                            failures = 0518                        else:519                            max_calls_per_failing_step = max(520                                max_calls_per_failing_step, self.calls - initial_calls521                            )522                            failures += 1523                # We reorder the shrink passes so that on our next run through524                # we try good ones first. The rule is that shrink passes that525                # did nothing useful are the worst, shrink passes that reduced526                # the length are the best.527                if self.shrink_target is before_sp:528                    reordering[sp] = 1529                elif len(self.buffer) < len(before_sp.buffer):530                    reordering[sp] = -1531                else:532                    reordering[sp] = 0533            passes.sort(key=reordering.__getitem__)534    @property535    def buffer(self):536        return self.shrink_target.buffer537    @property538    def blocks(self):539        return self.shrink_target.blocks540    @property541    def examples(self):542        return self.shrink_target.examples543    def all_block_bounds(self):544        return self.shrink_target.blocks.all_bounds()545    @derived_value546    def examples_by_label(self):547        """An index of all examples grouped by their label, with548        the examples stored in their normal index order."""549        examples_by_label = defaultdict(list)550        for ex in self.examples:551            examples_by_label[ex.label].append(ex)552        return dict(examples_by_label)553    @derived_value554    def distinct_labels(self):555        return sorted(self.examples_by_label, key=str)556    @defines_shrink_pass()557    def pass_to_descendant(self, chooser):558        """Attempt to replace each example with a descendant example.559        This is designed to deal with strategies that call themselves560        recursively. For example, suppose we had:561        binary_tree = st.deferred(562            lambda: st.one_of(563                st.integers(), st.tuples(binary_tree, binary_tree)))564        This pass guarantees that we can replace any binary tree with one of565        its subtrees - each of those will create an interval that the parent566        could validly be replaced with, and this pass will try doing that.567        This is pretty expensive - it takes O(len(intervals)^2) - so we run it568        late in the process when we've got the number of intervals as far down569        as possible.570        """571        label = chooser.choose(572            self.distinct_labels, lambda l: len(self.examples_by_label[l]) >= 2573        )574        ls = self.examples_by_label[label]575        i = chooser.choose(range(len(ls) - 1))576        ancestor = ls[i]577        if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end:578            return579        @self.cached(label, i)580        def descendants():581            lo = i + 1582            hi = len(ls)583            while lo + 1 < hi:584                mid = (lo + hi) // 2585                if ls[mid].start >= ancestor.end:586                    hi = mid587                else:588                    lo = mid589            return [t for t in ls[i + 1 : hi] if t.length < ancestor.length]590        descendant = chooser.choose(descendants, lambda ex: ex.length > 0)591        assert ancestor.start <= descendant.start592        assert ancestor.end >= descendant.end593        assert descendant.length < ancestor.length594        self.incorporate_new_buffer(595            self.buffer[: ancestor.start]596            + self.buffer[descendant.start : descendant.end]597            + self.buffer[ancestor.end :]598        )599    def lower_common_block_offset(self):600        """Sometimes we find ourselves in a situation where changes to one part601        of the byte stream unlock changes to other parts. Sometimes this is602        good, but sometimes this can cause us to exhibit exponential slow603        downs!604        e.g. suppose we had the following:605        m = draw(integers(min_value=0))606        n = draw(integers(min_value=0))607        assert abs(m - n) > 1608        If this fails then we'll end up with a loop where on each iteration we609        reduce each of m and n by 2 - m can't go lower because of n, then n610        can't go lower because of m.611        This will take us O(m) iterations to complete, which is exponential in612        the data size, as we gradually zig zag our way towards zero.613        This can only happen if we're failing to reduce the size of the byte614        stream: The number of iterations that reduce the length of the byte615        stream is bounded by that length.616        So what we do is this: We keep track of which blocks are changing, and617        then if there's some non-zero common offset to them we try and minimize618        them all at once by lowering that offset.619        This may not work, and it definitely won't get us out of all possible620        exponential slow downs (an example of where it doesn't is where the621        shape of the blocks changes as a result of this bouncing behaviour),622        but it fails fast when it doesn't work and gets us out of a really623        nastily slow case when it does.624        """625        if len(self.__changed_blocks) <= 1:626            return627        current = self.shrink_target628        blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]629        changed = [630            i631            for i in sorted(self.__changed_blocks)632            if not self.shrink_target.blocks[i].trivial633        ]634        if not changed:635            return636        ints = [int_from_bytes(blocked[i]) for i in changed]637        offset = min(ints)638        assert offset > 0639        for i in range(len(ints)):640            ints[i] -= offset641        def reoffset(o):642            new_blocks = list(blocked)643            for i, v in zip(changed, ints):644                new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))645            return self.incorporate_new_buffer(b"".join(new_blocks))646        Integer.shrink(offset, reoffset, random=self.random)647        self.clear_change_tracking()648    def clear_change_tracking(self):649        self.__last_checked_changed_at = self.shrink_target650        self.__all_changed_blocks = set()651    def mark_changed(self, i):652        self.__changed_blocks.add(i)653    @property654    def __changed_blocks(self):655        if self.__last_checked_changed_at is not self.shrink_target:656            prev_target = self.__last_checked_changed_at657            new_target = self.shrink_target658            assert prev_target is not new_target659            prev = prev_target.buffer660            new = new_target.buffer661            assert sort_key(new) < sort_key(prev)662            if (663                len(new_target.blocks) != len(prev_target.blocks)664                or new_target.blocks.endpoints != prev_target.blocks.endpoints665            ):666                self.__all_changed_blocks = set()667            else:668                blocks = new_target.blocks669                # Index of last block whose contents have been modified, found670                # by checking if the tail past this point has been modified.671                last_changed = binary_search(672                    0,673                    len(blocks),674                    lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],675                )676                # Index of the first block whose contents have been changed,677                # because we know that this predicate is true for zero (because678                # the prefix from the start is empty), so the result must be True679                # for the bytes from the start of this block and False for the680                # bytes from the end, hence the change is in this block.681                first_changed = binary_search(682                    0,683                    len(blocks),684                    lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],685                )686                # Between these two changed regions we now do a linear scan to687                # check if any specific block values have changed.688                for i in range(first_changed, last_changed + 1):689                    u, v = blocks.bounds(i)690                    if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:691                        self.__all_changed_blocks.add(i)692            self.__last_checked_changed_at = new_target693        assert self.__last_checked_changed_at is self.shrink_target694        return self.__all_changed_blocks695    def update_shrink_target(self, new_target):696        assert isinstance(new_target, ConjectureResult)697        if self.shrink_target is not None:698            self.shrinks += 1699            # If we are just taking a long time to shrink we don't want to700            # trigger this heuristic, so whenever we shrink successfully701            # we give ourselves a bit of breathing room to make sure we702            # would find a shrink that took that long to find the next time.703            # The case where we're taking a long time but making steady704            # progress is handled by `finish_shrinking_deadline` in engine.py705            self.max_stall = max(706                self.max_stall, (self.calls - self.calls_at_last_shrink) * 2707            )708            self.calls_at_last_shrink = self.calls709        else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
