Best Python code snippet using hypothesis
engine.py
Source:engine.py  
1# coding=utf-82#3# This file is part of Hypothesis, which may be found at4# https://github.com/HypothesisWorks/hypothesis-python5#6# Most of this work is copyright (C) 2013-2018 David R. MacIver7# (david@drmaciver.com), but it contains contributions by others. See8# CONTRIBUTING.rst for a full list of people who may hold copyright, and9# consult the git log if you need to determine who owns an individual10# contribution.11#12# This Source Code Form is subject to the terms of the Mozilla Public License,13# v. 2.0. If a copy of the MPL was not distributed with this file, You can14# obtain one at http://mozilla.org/MPL/2.0/.15#16# END HEADER17from __future__ import division, print_function, absolute_import18import heapq19from enum import Enum20from random import Random, getrandbits21from weakref import WeakKeyDictionary22from functools import total_ordering23from collections import defaultdict24import attr25from hypothesis import Phase, Verbosity, HealthCheck26from hypothesis import settings as Settings27from hypothesis._settings import local_settings, note_deprecation28from hypothesis.reporting import debug_report29from hypothesis.internal.compat import Counter, ceil, hbytes, hrange, \30    int_to_bytes, benchmark_time, int_from_bytes, to_bytes_sequence31from hypothesis.utils.conventions import UniqueIdentifier32from hypothesis.internal.healthcheck import fail_health_check33from hypothesis.internal.conjecture.data import MAX_DEPTH, Status, \34    StopTest, ConjectureData35from hypothesis.internal.conjecture.shrinking import Length, Integer, \36    Lexical, Ordering37# Tell pytest to omit the body of this module from tracebacks38# http://doc.pytest.org/en/latest/example/simple.html#writing-well-integrated-assertion-helpers39__tracebackhide__ = True40HUNG_TEST_TIME_LIMIT = 5 * 6041MAX_SHRINKS = 50042@attr.s43class HealthCheckState(object):44    valid_examples = attr.ib(default=0)45    invalid_examples = attr.ib(default=0)46    overrun_examples = attr.ib(default=0)47    draw_times = attr.ib(default=attr.Factory(list))48class ExitReason(Enum):49    max_examples = 050    max_iterations = 151    timeout = 252    max_shrinks = 353    finished = 454    flaky = 555class RunIsComplete(Exception):56    pass57class ConjectureRunner(object):58    def __init__(59        self, test_function, settings=None, random=None,60        database_key=None,61    ):62        self._test_function = test_function63        self.settings = settings or Settings()64        self.shrinks = 065        self.call_count = 066        self.event_call_counts = Counter()67        self.valid_examples = 068        self.start_time = benchmark_time()69        self.random = random or Random(getrandbits(128))70        self.database_key = database_key71        self.status_runtimes = {}72        self.all_drawtimes = []73        self.all_runtimes = []74        self.events_to_strings = WeakKeyDictionary()75        self.target_selector = TargetSelector(self.random)76        # Tree nodes are stored in an array to prevent heavy nesting of data77        # structures. Branches are dicts mapping bytes to child nodes (which78        # will in general only be partially populated). Leaves are79        # ConjectureData objects that have been previously seen as the result80        # of following that path.81        self.tree = [{}]82        # A node is dead if there is nothing left to explore past that point.83        # Recursively, a node is dead if either it is a leaf or every byte84        # leads to a dead node when starting from here.85        self.dead = set()86        # We rewrite the byte stream at various points during parsing, to one87        # that will produce an equivalent result but is in some sense more88        # canonical. We keep track of these so that when walking the tree we89        # can identify nodes where the exact byte value doesn't matter and90        # treat all bytes there as equivalent. This significantly reduces the91        # size of the search space and removes a lot of redundant examples.92        # Maps tree indices where to the unique byte that is valid at that93        # point. Corresponds to data.write() calls.94        self.forced = {}95        # Maps tree indices to a mask that restricts bytes at that point.96        # Currently this is only updated by draw_bits, but it potentially97        # could get used elsewhere.98        self.masks = {}99        # Where a tree node consists of the beginning of a block we track the100        # size of said block. This allows us to tell when an example is too101        # short even if it goes off the unexplored region of the tree - if it102        # is at the beginning of a block of size 4 but only has 3 bytes left,103        # it's going to overrun the end of the buffer regardless of the104        # buffer contents.105        self.block_sizes = {}106        self.interesting_examples = {}107        self.covering_examples = {}108        self.shrunk_examples = set()109        self.tag_intern_table = {}110        self.health_check_state = None111        self.used_examples_from_database = False112    def __tree_is_exhausted(self):113        return 0 in self.dead114    def test_function(self, data):115        if benchmark_time() - self.start_time >= HUNG_TEST_TIME_LIMIT:116            fail_health_check(self.settings, (117                'Your test has been running for at least five minutes. This '118                'is probably not what you intended, so by default Hypothesis '119                'turns it into an error.'120            ), HealthCheck.hung_test)121        self.call_count += 1122        try:123            self._test_function(data)124            data.freeze()125        except StopTest as e:126            if e.testcounter != data.testcounter:127                self.save_buffer(data.buffer)128                raise e129        except BaseException:130            self.save_buffer(data.buffer)131            raise132        finally:133            data.freeze()134            self.note_details(data)135        self.target_selector.add(data)136        self.debug_data(data)137        tags = frozenset(data.tags)138        data.tags = self.tag_intern_table.setdefault(tags, tags)139        if data.status == Status.VALID:140            self.valid_examples += 1141            for t in data.tags:142                existing = self.covering_examples.get(t)143                if (144                    existing is None or145                    sort_key(data.buffer) < sort_key(existing.buffer)146                ):147                    self.covering_examples[t] = data148                    if self.database is not None:149                        self.database.save(self.covering_key, data.buffer)150                        if existing is not None:151                            self.database.delete(152                                self.covering_key, existing.buffer)153        tree_node = self.tree[0]154        indices = []155        node_index = 0156        for i, b in enumerate(data.buffer):157            indices.append(node_index)158            if i in data.forced_indices:159                self.forced[node_index] = b160            try:161                self.masks[node_index] = data.masked_indices[i]162            except KeyError:163                pass164            try:165                node_index = tree_node[b]166            except KeyError:167                node_index = len(self.tree)168                self.tree.append({})169                tree_node[b] = node_index170            tree_node = self.tree[node_index]171            if node_index in self.dead:172                break173        for u, v in data.blocks:174            # This can happen if we hit a dead node when walking the buffer.175            # In that case we already have this section of the tree mapped.176            if u >= len(indices):177                break178            self.block_sizes[indices[u]] = v - u179        self.dead.update(indices[self.cap:])180        if data.status != Status.OVERRUN and node_index not in self.dead:181            self.dead.add(node_index)182            self.tree[node_index] = data183            for j in reversed(indices):184                mask = self.masks.get(j, 0xff)185                assert _is_simple_mask(mask)186                max_size = mask + 1187                if (188                    len(self.tree[j]) < max_size and189                    j not in self.forced190                ):191                    break192                if set(self.tree[j].values()).issubset(self.dead):193                    self.dead.add(j)194                else:195                    break196        if data.status == Status.INTERESTING:197            key = data.interesting_origin198            changed = False199            try:200                existing = self.interesting_examples[key]201            except KeyError:202                changed = True203            else:204                if sort_key(data.buffer) < sort_key(existing.buffer):205                    self.shrinks += 1206                    self.downgrade_buffer(existing.buffer)207                    changed = True208            if changed:209                self.save_buffer(data.buffer)210                self.interesting_examples[key] = data211                self.shrunk_examples.discard(key)212            if self.shrinks >= MAX_SHRINKS:213                self.exit_with(ExitReason.max_shrinks)214        if (215            self.settings.timeout > 0 and216            benchmark_time() >= self.start_time + self.settings.timeout217        ):218            note_deprecation((219                'Your tests are hitting the settings timeout (%.2fs). '220                'This functionality will go away in a future release '221                'and you should not rely on it. Instead, try setting '222                'max_examples to be some value lower than %d (the number '223                'of examples your test successfully ran here). Or, if you '224                'would prefer your tests to run to completion, regardless '225                'of how long they take, you can set the timeout value to '226                'hypothesis.unlimited.'227            ) % (228                self.settings.timeout, self.valid_examples),229                self.settings)230            self.exit_with(ExitReason.timeout)231        if not self.interesting_examples:232            if self.valid_examples >= self.settings.max_examples:233                self.exit_with(ExitReason.max_examples)234            if self.call_count >= max(235                self.settings.max_examples * 10,236                # We have a high-ish default max iterations, so that tests237                # don't become flaky when max_examples is too low.238                1000239            ):240                self.exit_with(ExitReason.max_iterations)241        if self.__tree_is_exhausted():242            self.exit_with(ExitReason.finished)243        self.record_for_health_check(data)244    def generate_novel_prefix(self):245        prefix = bytearray()246        node = 0247        while True:248            assert len(prefix) < self.cap249            assert node not in self.dead250            mask = self.masks.get(node, 0xff)251            assert _is_simple_mask(mask)252            upper_bound = mask + 1253            try:254                c = self.forced[node]255                prefix.append(c)256                node = self.tree[node][c]257                continue258            except KeyError:259                pass260            c = self.random.randrange(0, upper_bound)261            try:262                next_node = self.tree[node][c]263                if next_node in self.dead:264                    choices = [265                        b for b in hrange(upper_bound)266                        if self.tree[node].get(b) not in self.dead267                    ]268                    assert choices269                    c = self.random.choice(choices)270                    node = self.tree[node][c]271                else:272                    node = next_node273                prefix.append(c)274            except KeyError:275                prefix.append(c)276                break277        assert node not in self.dead278        return hbytes(prefix)279    @property280    def cap(self):281        return self.settings.buffer_size // 2282    def record_for_health_check(self, data):283        # Once we've actually found a bug, there's no point in trying to run284        # health checks - they'll just mask the actually important information.285        if data.status == Status.INTERESTING:286            self.health_check_state = None287        state = self.health_check_state288        if state is None:289            return290        state.draw_times.extend(data.draw_times)291        if data.status == Status.VALID:292            state.valid_examples += 1293        elif data.status == Status.INVALID:294            state.invalid_examples += 1295        else:296            assert data.status == Status.OVERRUN297            state.overrun_examples += 1298        max_valid_draws = 10299        max_invalid_draws = 50300        max_overrun_draws = 20301        assert state.valid_examples <= max_valid_draws302        if state.valid_examples == max_valid_draws:303            self.health_check_state = None304            return305        if state.overrun_examples == max_overrun_draws:306            fail_health_check(self.settings, (307                'Examples routinely exceeded the max allowable size. '308                '(%d examples overran while generating %d valid ones)'309                '. Generating examples this large will usually lead to'310                ' bad results. You could try setting max_size parameters '311                'on your collections and turning '312                'max_leaves down on recursive() calls.') % (313                state.overrun_examples, state.valid_examples314            ), HealthCheck.data_too_large)315        if state.invalid_examples == max_invalid_draws:316            fail_health_check(self.settings, (317                'It looks like your strategy is filtering out a lot '318                'of data. Health check found %d filtered examples but '319                'only %d good ones. This will make your tests much '320                'slower, and also will probably distort the data '321                'generation quite a lot. You should adapt your '322                'strategy to filter less. This can also be caused by '323                'a low max_leaves parameter in recursive() calls') % (324                state.invalid_examples, state.valid_examples325            ), HealthCheck.filter_too_much)326        draw_time = sum(state.draw_times)327        if draw_time > 1.0:328            fail_health_check(self.settings, (329                'Data generation is extremely slow: Only produced '330                '%d valid examples in %.2f seconds (%d invalid ones '331                'and %d exceeded maximum size). Try decreasing '332                "size of the data you're generating (with e.g."333                'max_size or max_leaves parameters).'334            ) % (335                state.valid_examples, draw_time, state.invalid_examples,336                state.overrun_examples), HealthCheck.too_slow,)337    def save_buffer(self, buffer):338        if self.settings.database is not None:339            key = self.database_key340            if key is None:341                return342            self.settings.database.save(key, hbytes(buffer))343    def downgrade_buffer(self, buffer):344        if (345            self.settings.database is not None and346            self.database_key is not None347        ):348            self.settings.database.move(349                self.database_key, self.secondary_key, buffer)350    @property351    def secondary_key(self):352        return b'.'.join((self.database_key, b'secondary'))353    @property354    def covering_key(self):355        return b'.'.join((self.database_key, b'coverage'))356    def note_details(self, data):357        runtime = max(data.finish_time - data.start_time, 0.0)358        self.all_runtimes.append(runtime)359        self.all_drawtimes.extend(data.draw_times)360        self.status_runtimes.setdefault(data.status, []).append(runtime)361        for event in set(map(self.event_to_string, data.events)):362            self.event_call_counts[event] += 1363    def debug(self, message):364        with local_settings(self.settings):365            debug_report(message)366    @property367    def report_debug_info(self):368        return self.settings.verbosity >= Verbosity.debug369    def debug_data(self, data):370        if not self.report_debug_info:371            return372        stack = [[]]373        def go(ex):374            if ex.length == 0:375                return376            if len(ex.children) == 0:377                stack[-1].append(int_from_bytes(378                    data.buffer[ex.start:ex.end]379                ))380            else:381                node = []382                stack.append(node)383                for v in ex.children:384                    go(v)385                stack.pop()386                if len(node) == 1:387                    stack[-1].extend(node)388                else:389                    stack[-1].append(node)390        go(data.examples[0])391        assert len(stack) == 1392        status = repr(data.status)393        if data.status == Status.INTERESTING:394            status = '%s (%r)' % (status, data.interesting_origin,)395        self.debug('%d bytes %r -> %s, %s' % (396            data.index, stack[0], status, data.output,397        ))398    def run(self):399        with local_settings(self.settings):400            try:401                self._run()402            except RunIsComplete:403                pass404            for v in self.interesting_examples.values():405                self.debug_data(v)406            self.debug(407                u'Run complete after %d examples (%d valid) and %d shrinks'408                % (self.call_count, self.valid_examples, self.shrinks))409    def _new_mutator(self):410        target_data = [None]411        def draw_new(data, n):412            return uniform(self.random, n)413        def draw_existing(data, n):414            return target_data[0].buffer[data.index:data.index + n]415        def draw_smaller(data, n):416            existing = target_data[0].buffer[data.index:data.index + n]417            r = uniform(self.random, n)418            if r <= existing:419                return r420            return _draw_predecessor(self.random, existing)421        def draw_larger(data, n):422            existing = target_data[0].buffer[data.index:data.index + n]423            r = uniform(self.random, n)424            if r >= existing:425                return r426            return _draw_successor(self.random, existing)427        def reuse_existing(data, n):428            choices = data.block_starts.get(n, [])429            if choices:430                i = self.random.choice(choices)431                return hbytes(data.buffer[i:i + n])432            else:433                result = uniform(self.random, n)434                assert isinstance(result, hbytes)435                return result436        def flip_bit(data, n):437            buf = bytearray(438                target_data[0].buffer[data.index:data.index + n])439            i = self.random.randint(0, n - 1)440            k = self.random.randint(0, 7)441            buf[i] ^= (1 << k)442            return hbytes(buf)443        def draw_zero(data, n):444            return hbytes(b'\0' * n)445        def draw_max(data, n):446            return hbytes([255]) * n447        def draw_constant(data, n):448            return hbytes([self.random.randint(0, 255)]) * n449        def redraw_last(data, n):450            u = target_data[0].blocks[-1][0]451            if data.index + n <= u:452                return target_data[0].buffer[data.index:data.index + n]453            else:454                return uniform(self.random, n)455        options = [456            draw_new,457            redraw_last, redraw_last,458            reuse_existing, reuse_existing,459            draw_existing, draw_smaller, draw_larger,460            flip_bit,461            draw_zero, draw_max, draw_zero, draw_max,462            draw_constant,463        ]464        bits = [465            self.random.choice(options) for _ in hrange(3)466        ]467        prefix = [None]468        def mutate_from(origin):469            target_data[0] = origin470            prefix[0] = self.generate_novel_prefix()471            return draw_mutated472        def draw_mutated(data, n):473            if data.index + n > len(target_data[0].buffer):474                result = uniform(self.random, n)475            else:476                result = self.random.choice(bits)(data, n)477            p = prefix[0]478            if data.index < len(p):479                start = p[data.index:data.index + n]480                result = start + result[len(start):]481            return self.__zero_bound(data, result)482        return mutate_from483    def __rewrite(self, data, result):484        return self.__zero_bound(data, result)485    def __zero_bound(self, data, result):486        """This tries to get the size of the generated data under control by487        replacing the result with zero if we are too deep or have already488        generated too much data.489        This causes us to enter "shrinking mode" there and thus reduce490        the size of the generated data.491        """492        initial = len(result)493        if data.depth * 2 >= MAX_DEPTH or data.index >= self.cap:494            data.forced_indices.update(495                hrange(data.index, data.index + initial))496            data.hit_zero_bound = True497            result = hbytes(initial)498        elif data.index + initial >= self.cap:499            data.hit_zero_bound = True500            n = self.cap - data.index501            data.forced_indices.update(502                hrange(self.cap, data.index + initial))503            result = result[:n] + hbytes(initial - n)504        assert len(result) == initial505        return result506    @property507    def database(self):508        if self.database_key is None:509            return None510        return self.settings.database511    def has_existing_examples(self):512        return (513            self.database is not None and514            Phase.reuse in self.settings.phases515        )516    def reuse_existing_examples(self):517        """If appropriate (we have a database and have been told to use it),518        try to reload existing examples from the database.519        If there are a lot we don't try all of them. We always try the520        smallest example in the database (which is guaranteed to be the521        last failure) and the largest (which is usually the seed example522        which the last failure came from but we don't enforce that). We523        then take a random sampling of the remainder and try those. Any524        examples that are no longer interesting are cleared out.525        """526        if self.has_existing_examples():527            self.debug('Reusing examples from database')528            # We have to do some careful juggling here. We have two database529            # corpora: The primary and secondary. The primary corpus is a530            # small set of minimized examples each of which has at one point531            # demonstrated a distinct bug. We want to retry all of these.532            # We also have a secondary corpus of examples that have at some533            # point demonstrated interestingness (currently only ones that534            # were previously non-minimal examples of a bug, but this will535            # likely expand in future). These are a good source of potentially536            # interesting examples, but there are a lot of them, so we down537            # sample the secondary corpus to a more manageable size.538            corpus = sorted(539                self.settings.database.fetch(self.database_key),540                key=sort_key541            )542            desired_size = max(2, ceil(0.1 * self.settings.max_examples))543            for extra_key in [self.secondary_key, self.covering_key]:544                if len(corpus) < desired_size:545                    extra_corpus = list(546                        self.settings.database.fetch(extra_key),547                    )548                    shortfall = desired_size - len(corpus)549                    if len(extra_corpus) <= shortfall:550                        extra = extra_corpus551                    else:552                        extra = self.random.sample(extra_corpus, shortfall)553                    extra.sort(key=sort_key)554                    corpus.extend(extra)555            self.used_examples_from_database = len(corpus) > 0556            for existing in corpus:557                last_data = ConjectureData.for_buffer(existing)558                try:559                    self.test_function(last_data)560                finally:561                    if last_data.status != Status.INTERESTING:562                        self.settings.database.delete(563                            self.database_key, existing)564                        self.settings.database.delete(565                            self.secondary_key, existing)566    def exit_with(self, reason):567        self.exit_reason = reason568        raise RunIsComplete()569    def generate_new_examples(self):570        if Phase.generate not in self.settings.phases:571            return572        zero_data = self.cached_test_function(573            hbytes(self.settings.buffer_size))574        if zero_data.status == Status.OVERRUN or (575            zero_data.status == Status.VALID and576            len(zero_data.buffer) * 2 > self.settings.buffer_size577        ):578            fail_health_check(579                self.settings,580                'The smallest natural example for your test is extremely '581                'large. This makes it difficult for Hypothesis to generate '582                'good examples, especially when trying to reduce failing ones '583                'at the end. Consider reducing the size of your data if it is '584                'of a fixed size. You could also fix this by improving how '585                'your data shrinks (see https://hypothesis.readthedocs.io/en/'586                'latest/data.html#shrinking for details), or by introducing '587                'default values inside your strategy. e.g. could you replace '588                'some arguments with their defaults by using '589                'one_of(none(), some_complex_strategy)?',590                HealthCheck.large_base_example591            )592        # If the language starts with writes of length >= cap then there is593        # only one string in it: Everything after cap is forced to be zero (or594        # to be whatever value is written there). That means that once we've595        # tried the zero value, there's nothing left for us to do, so we596        # exit early here.597        for i in hrange(self.cap):598            if i not in zero_data.forced_indices:599                break600        else:601            self.exit_with(ExitReason.finished)602        self.health_check_state = HealthCheckState()603        count = 0604        while not self.interesting_examples and (605            count < 10 or self.health_check_state is not None606        ):607            prefix = self.generate_novel_prefix()608            def draw_bytes(data, n):609                if data.index < len(prefix):610                    result = prefix[data.index:data.index + n]611                    if len(result) < n:612                        result += uniform(self.random, n - len(result))613                else:614                    result = uniform(self.random, n)615                return self.__zero_bound(data, result)616            targets_found = len(self.covering_examples)617            last_data = ConjectureData(618                max_length=self.settings.buffer_size,619                draw_bytes=draw_bytes620            )621            self.test_function(last_data)622            last_data.freeze()623            if len(self.covering_examples) > targets_found:624                count = 0625            else:626                count += 1627        mutations = 0628        mutator = self._new_mutator()629        zero_bound_queue = []630        while not self.interesting_examples:631            if zero_bound_queue:632                # Whenever we generated an example and it hits a bound633                # which forces zero blocks into it, this creates a weird634                # distortion effect by making certain parts of the data635                # stream (especially ones to the right) much more likely636                # to be zero. We fix this by redistributing the generated637                # data by shuffling it randomly. This results in the638                # zero data being spread evenly throughout the buffer.639                # Hopefully the shrinking this causes will cause us to640                # naturally fail to hit the bound.641                # If it doesn't then we will queue the new version up again642                # (now with more zeros) and try again.643                overdrawn = zero_bound_queue.pop()644                buffer = bytearray(overdrawn.buffer)645                # These will have values written to them that are different646                # from what's in them anyway, so the value there doesn't647                # really "count" for distributional purposes, and if we648                # leave them in then they can cause the fraction of non649                # zero bytes to increase on redraw instead of decrease.650                for i in overdrawn.forced_indices:651                    buffer[i] = 0652                self.random.shuffle(buffer)653                buffer = hbytes(buffer)654                def draw_bytes(data, n):655                    result = buffer[data.index:data.index + n]656                    if len(result) < n:657                        result += hbytes(n - len(result))658                    return self.__rewrite(data, result)659                data = ConjectureData(660                    draw_bytes=draw_bytes,661                    max_length=self.settings.buffer_size,662                )663                self.test_function(data)664                data.freeze()665            else:666                target, origin = self.target_selector.select()667                mutations += 1668                targets_found = len(self.covering_examples)669                data = ConjectureData(670                    draw_bytes=mutator(origin),671                    max_length=self.settings.buffer_size672                )673                self.test_function(data)674                data.freeze()675                if (676                    data.status > origin.status or677                    len(self.covering_examples) > targets_found678                ):679                    mutations = 0680                elif (681                    data.status < origin.status or682                    not self.target_selector.has_tag(target, data) or683                    mutations >= 10684                ):685                    # Cap the variations of a single example and move on to686                    # an entirely fresh start.  Ten is an entirely arbitrary687                    # constant, but it's been working well for years.688                    mutations = 0689                    mutator = self._new_mutator()690            if getattr(data, 'hit_zero_bound', False):691                zero_bound_queue.append(data)692            mutations += 1693    def _run(self):694        self.start_time = benchmark_time()695        self.reuse_existing_examples()696        self.generate_new_examples()697        self.shrink_interesting_examples()698        self.exit_with(ExitReason.finished)699    def shrink_interesting_examples(self):700        """If we've found interesting examples, try to replace each of them701        with a minimal interesting example with the same interesting_origin.702        We may find one or more examples with a new interesting_origin703        during the shrink process. If so we shrink these too.704        """705        if (706            Phase.shrink not in self.settings.phases or707            not self.interesting_examples708        ):709            return710        for prev_data in sorted(711            self.interesting_examples.values(),712            key=lambda d: sort_key(d.buffer)713        ):714            assert prev_data.status == Status.INTERESTING715            data = ConjectureData.for_buffer(prev_data.buffer)716            self.test_function(data)717            if data.status != Status.INTERESTING:718                self.exit_with(ExitReason.flaky)719        self.clear_secondary_key()720        while len(self.shrunk_examples) < len(self.interesting_examples):721            target, example = min([722                (k, v) for k, v in self.interesting_examples.items()723                if k not in self.shrunk_examples],724                key=lambda kv: (sort_key(kv[1].buffer), sort_key(repr(kv[0]))),725            )726            self.debug('Shrinking %r' % (target,))727            def predicate(d):728                if d.status < Status.INTERESTING:729                    return False730                return d.interesting_origin == target731            self.shrink(example, predicate)732            self.shrunk_examples.add(target)733    def clear_secondary_key(self):734        if self.has_existing_examples():735            # If we have any smaller examples in the secondary corpus, now is736            # a good time to try them to see if they work as shrinks. They737            # probably won't, but it's worth a shot and gives us a good738            # opportunity to clear out the database.739            # It's not worth trying the primary corpus because we already740            # tried all of those in the initial phase.741            corpus = sorted(742                self.settings.database.fetch(self.secondary_key),743                key=sort_key744            )745            for c in corpus:746                primary = {747                    v.buffer for v in self.interesting_examples.values()748                }749                cap = max(map(sort_key, primary))750                if sort_key(c) > cap:751                    break752                else:753                    self.cached_test_function(c)754                    # We unconditionally remove c from the secondary key as it755                    # is either now primary or worse than our primary example756                    # of this reason for interestingness.757                    self.settings.database.delete(self.secondary_key, c)758    def shrink(self, example, predicate):759        s = self.new_shrinker(example, predicate)760        s.shrink()761        return s.shrink_target762    def new_shrinker(self, example, predicate):763        return Shrinker(self, example, predicate)764    def prescreen_buffer(self, buffer):765        """Attempt to rule out buffer as a possible interesting candidate.766        Returns False if we know for sure that running this buffer will not767        produce an interesting result. Returns True if it might (because it768        explores territory we have not previously tried).769        This is purely an optimisation to try to reduce the number of tests we770        run. "return True" would be a valid but inefficient implementation.771        """772        node_index = 0773        n = len(buffer)774        for k, b in enumerate(buffer):775            if node_index in self.dead:776                return False777            try:778                # The block size at that point provides a lower bound on how779                # many more bytes are required. If the buffer does not have780                # enough bytes to fulfill that block size then we can rule out781                # this buffer.782                if k + self.block_sizes[node_index] > n:783                    return False784            except KeyError:785                pass786            try:787                b = self.forced[node_index]788            except KeyError:789                pass790            try:791                b = b & self.masks[node_index]792            except KeyError:793                pass794            try:795                node_index = self.tree[node_index][b]796            except KeyError:797                return True798        else:799            return False800    def cached_test_function(self, buffer):801        node_index = 0802        for c in buffer:803            try:804                c = self.forced[node_index]805            except KeyError:806                pass807            try:808                c = c & self.masks[node_index]809            except KeyError:810                pass811            try:812                node_index = self.tree[node_index][c]813            except KeyError:814                break815            node = self.tree[node_index]816            if isinstance(node, ConjectureData):817                return node818        result = ConjectureData.for_buffer(buffer)819        self.test_function(result)820        return result821    def event_to_string(self, event):822        if isinstance(event, str):823            return event824        try:825            return self.events_to_strings[event]826        except KeyError:827            pass828        result = str(event)829        self.events_to_strings[event] = result830        return result831def _is_simple_mask(mask):832    """A simple mask is ``(2 ** n - 1)`` for some ``n``, so it has the effect833    of keeping the lowest ``n`` bits and discarding the rest.834    A mask in this form can produce any integer between 0 and the mask itself835    (inclusive), and the total number of these values is ``(mask + 1)``.836    """837    return (mask & (mask + 1)) == 0838def _draw_predecessor(rnd, xs):839    r = bytearray()840    any_strict = False841    for x in to_bytes_sequence(xs):842        if not any_strict:843            c = rnd.randint(0, x)844            if c < x:845                any_strict = True846        else:847            c = rnd.randint(0, 255)848        r.append(c)849    return hbytes(r)850def _draw_successor(rnd, xs):851    r = bytearray()852    any_strict = False853    for x in to_bytes_sequence(xs):854        if not any_strict:855            c = rnd.randint(x, 255)856            if c > x:857                any_strict = True858        else:859            c = rnd.randint(0, 255)860        r.append(c)861    return hbytes(r)862def sort_key(buffer):863    return (len(buffer), buffer)864def uniform(random, n):865    return int_to_bytes(random.getrandbits(n * 8), n)866class SampleSet(object):867    """Set data type with the ability to sample uniformly at random from it.868    The mechanism is that we store the set in two parts: A mapping of869    values to their index in an array. Sampling uniformly at random then870    becomes simply a matter of sampling from the array, but we can use871    the index for efficient lookup to add and remove values.872    """873    __slots__ = ('__values', '__index')874    def __init__(self):875        self.__values = []876        self.__index = {}877    def __len__(self):878        return len(self.__values)879    def __repr__(self):880        return 'SampleSet(%r)' % (self.__values,)881    def add(self, value):882        assert value not in self.__index883        # Adding simply consists of adding the value to the end of the array884        # and updating the index.885        self.__index[value] = len(self.__values)886        self.__values.append(value)887    def remove(self, value):888        # To remove a value we first remove it from the index. But this leaves889        # us with the value still in the array, so we have to fix that. We890        # can't simply remove the value from the array, as that would a) Be an891        # O(n) operation and b) Leave the index completely wrong for every892        # value after that index.893        # So what we do is we take the last element of the array and place it894        # in the position of the value we just deleted (if the value was not895        # already the last element of the array. If it was then we don't have896        # to do anything extra). This reorders the array, but that's OK because897        # we don't care about its order, we just need to sample from it.898        i = self.__index.pop(value)899        last = self.__values.pop()900        if i < len(self.__values):901            self.__values[i] = last902            self.__index[last] = i903    def choice(self, random):904        return random.choice(self.__values)905class Negated(object):906    __slots__ = ('tag',)907    def __init__(self, tag):908        self.tag = tag909NEGATED_CACHE = {}  # type: dict910def negated(tag):911    try:912        return NEGATED_CACHE[tag]913    except KeyError:914        result = Negated(tag)915        NEGATED_CACHE[tag] = result916        return result917universal = UniqueIdentifier('universal')918class TargetSelector(object):919    """Data structure for selecting targets to use for mutation.920    The goal is to do a good job of exploiting novelty in examples without921    getting too obsessed with any particular novel factor.922    Roughly speaking what we want to do is give each distinct coverage target923    equal amounts of time. However some coverage targets may be harder to fuzz924    than others, or may only appear in a very small minority of examples, so we925    don't want to let those dominate the testing.926    Targets are selected according to the following rules:927    1. We ideally want valid examples as our starting point. We ignore928       interesting examples entirely, and other than that we restrict ourselves929       to the best example status we've seen so far. If we've only seen930       OVERRUN examples we use those. If we've seen INVALID but not VALID931       examples we use those. Otherwise we use VALID examples.932    2. Among the examples we've seen with the right status, when asked to933       select a target, we select a coverage target and return that along with934       an example exhibiting that target uniformly at random.935    Coverage target selection proceeds as follows:936    1. Whenever we return an example from select, we update the usage count of937       each of its tags.938    2. Whenever we see an example, we add it to the list of examples for all of939       its tags.940    3. When selecting a tag, we select one with a minimal usage count. Among941       those of minimal usage count we select one with the fewest examples.942       Among those, we select one uniformly at random.943    This has the following desirable properties:944    1. When two coverage targets are intrinsically linked (e.g. when you have945       multiple lines in a conditional so that either all or none of them will946       be covered in a conditional) they are naturally deduplicated.947    2. Popular coverage targets will largely be ignored for considering what948       test to run - if every example exhibits a coverage target, picking an949       example because of that target is rather pointless.950    3. When we discover new coverage targets we immediately exploit them until951       we get to the point where we've spent about as much time on them as the952       existing targets.953    4. Among the interesting deduplicated coverage targets we essentially954       round-robin between them, but with a more consistent distribution than955       uniformly at random, which is important particularly for short runs.956    """957    def __init__(self, random):958        self.random = random959        self.best_status = Status.OVERRUN960        self.reset()961    def reset(self):962        self.examples_by_tags = defaultdict(list)963        self.tag_usage_counts = Counter()964        self.tags_by_score = defaultdict(SampleSet)965        self.scores_by_tag = {}966        self.scores = []967        self.mutation_counts = 0968        self.example_counts = 0969        self.non_universal_tags = set()970        self.universal_tags = None971    def add(self, data):972        if data.status == Status.INTERESTING:973            return974        if data.status < self.best_status:975            return976        if data.status > self.best_status:977            self.best_status = data.status978            self.reset()979        if self.universal_tags is None:980            self.universal_tags = set(data.tags)981        else:982            not_actually_universal = self.universal_tags - data.tags983            for t in not_actually_universal:984                self.universal_tags.remove(t)985                self.non_universal_tags.add(t)986                self.examples_by_tags[t] = list(987                    self.examples_by_tags[universal]988                )989        new_tags = data.tags - self.non_universal_tags990        for t in new_tags:991            self.non_universal_tags.add(t)992            self.examples_by_tags[negated(t)] = list(993                self.examples_by_tags[universal]994            )995        self.example_counts += 1996        for t in self.tags_for(data):997            self.examples_by_tags[t].append(data)998            self.rescore(t)999    def has_tag(self, tag, data):1000        if tag is universal:1001            return True1002        if isinstance(tag, Negated):1003            return tag.tag not in data.tags1004        return tag in data.tags1005    def tags_for(self, data):1006        yield universal1007        for t in data.tags:1008            yield t1009        for t in self.non_universal_tags:1010            if t not in data.tags:1011                yield negated(t)1012    def rescore(self, tag):1013        new_score = (1014            self.tag_usage_counts[tag], len(self.examples_by_tags[tag]))1015        try:1016            old_score = self.scores_by_tag[tag]1017        except KeyError:1018            pass1019        else:1020            self.tags_by_score[old_score].remove(tag)1021        self.scores_by_tag[tag] = new_score1022        sample = self.tags_by_score[new_score]1023        if len(sample) == 0:1024            heapq.heappush(self.scores, new_score)1025        sample.add(tag)1026    def select_tag(self):1027        while True:1028            peek = self.scores[0]1029            sample = self.tags_by_score[peek]1030            if len(sample) == 0:1031                heapq.heappop(self.scores)1032            else:1033                return sample.choice(self.random)1034    def select_example_for_tag(self, t):1035        return self.random.choice(self.examples_by_tags[t])1036    def select(self):1037        t = self.select_tag()1038        self.mutation_counts += 11039        result = self.select_example_for_tag(t)1040        assert self.has_tag(t, result)1041        for s in self.tags_for(result):1042            self.tag_usage_counts[s] += 11043            self.rescore(s)1044        return t, result1045def block_program(description):1046    """Mini-DSL for block rewriting. A sequence of commands that will be run1047    over all contiguous sequences of blocks of the description length in order.1048    Commands are:1049        * ".", keep this block unchanged1050        * "-", subtract one from this block.1051        * "0", replace this block with zero1052        * "X", delete this block1053    If a command does not apply (currently only because it's - on a zero1054    block) the block will be silently skipped over. As a side effect of1055    running a block program its score will be updated.1056    """1057    def run(self):1058        n = len(description)1059        i = 01060        while i + n <= len(self.shrink_target.blocks):1061            attempt = bytearray(self.shrink_target.buffer)1062            failed = False1063            for k, d in reversed(list(enumerate(description))):1064                j = i + k1065                u, v = self.blocks[j]1066                if d == '-':1067                    value = int_from_bytes(attempt[u:v])1068                    if value == 0:1069                        failed = True1070                        break1071                    else:1072                        attempt[u:v] = int_to_bytes(value - 1, v - u)1073                elif d == 'X':1074                    del attempt[u:v]1075                else:  # pragma: no cover1076                    assert False, 'Unrecognised command %r' % (d,)1077            if failed or not self.incorporate_new_buffer(attempt):1078                i += 11079    run.command = description1080    run.__name__ = 'block_program(%r)' % (description,)1081    return run1082class PassClassification(Enum):1083    CANDIDATE = 01084    HOPEFUL = 11085    DUBIOUS = 21086    AVOID = 31087    SPECIAL = 41088@total_ordering1089@attr.s(slots=True, cmp=False)1090class ShrinkPass(object):1091    pass_function = attr.ib()1092    index = attr.ib()1093    classification = attr.ib(default=PassClassification.CANDIDATE)1094    successes = attr.ib(default=0)1095    runs = attr.ib(default=0)1096    calls = attr.ib(default=0)1097    shrinks = attr.ib(default=0)1098    deletions = attr.ib(default=0)1099    @property1100    def failures(self):1101        return self.runs - self.successes1102    @property1103    def name(self):1104        return self.pass_function.__name__1105    def __eq__(self, other):1106        return self.index == other.index1107    def __hash__(self):1108        return hash(self.index)1109    def __lt__(self, other):1110        return self.key() < other.key()1111    def key(self):1112        # Smaller is better.1113        return (1114            self.runs,1115            self.failures,1116            self.calls,1117            self.index1118        )1119class Shrinker(object):1120    """A shrinker is a child object of a ConjectureRunner which is designed to1121    manage the associated state of a particular shrink problem.1122    Currently the only shrink problem we care about is "interesting and with a1123    particular interesting_origin", but this is abstracted into a general1124    purpose predicate for more flexibility later - e.g. we are likely to want1125    to shrink with respect to a particular coverage target later.1126    Data with a status < VALID may be assumed not to satisfy the predicate.1127    The expected usage pattern is that this is only ever called from within the1128    engine.1129    """1130    DEFAULT_PASSES = [1131        'pass_to_descendant',1132        'zero_examples',1133        'adaptive_example_deletion',1134        'reorder_examples',1135        'minimize_duplicated_blocks',1136        'minimize_individual_blocks',1137    ]1138    EMERGENCY_PASSES = [1139        block_program('-XX'),1140        block_program('XX'),1141        'example_deletion_with_block_lowering',1142        'shrink_offset_pairs',1143        'minimize_block_pairs_retaining_sum',1144    ]1145    def __init__(self, engine, initial, predicate):1146        """Create a shrinker for a particular engine, with a given starting1147        point and predicate. When shrink() is called it will attempt to find an1148        example for which predicate is True and which is strictly smaller than1149        initial.1150        Note that initial is a ConjectureData object, and predicate1151        takes ConjectureData objects.1152        """1153        self.__engine = engine1154        self.__predicate = predicate1155        self.discarding_failed = False1156        self.__shrinking_prefixes = set()1157        self.initial_size = len(initial.buffer)1158        # We add a second level of caching local to the shrinker. This is a bit1159        # of a hack. Ideally we'd be able to rely on the engine's functionality1160        # for this. Doing it this way has two benefits: Firstly, the engine1161        # does not currently cache overruns (and probably shouldn't, but could1162        # recreate them on demand if necessary), and secondly Python dicts are1163        # much faster than our pure Python tree-based lookups.1164        self.__test_function_cache = {}1165        # We keep track of the current best example on the shrink_target1166        # attribute.1167        self.shrink_target = None1168        self.update_shrink_target(initial)1169        self.shrinks = 01170        self.initial_calls = self.__engine.call_count1171        self.current_pass_depth = 01172        self.passes_by_name = {}1173        self.clear_passes()1174        for p in Shrinker.DEFAULT_PASSES:1175            self.add_new_pass(p)1176        for p in Shrinker.EMERGENCY_PASSES:1177            self.add_new_pass(p, classification=PassClassification.AVOID)1178        self.add_new_pass(1179            'lower_common_block_offset',1180            classification=PassClassification.SPECIAL1181        )1182    def clear_passes(self):1183        """Reset all passes on the shrinker, leaving it in a blank state.1184        This is mostly useful for testing.1185        """1186        # Note that we deliberately do not clear passes_by_name. This means1187        # that we can still look up and explicitly run the standard passes,1188        # they just won't be avaiable by default.1189        self.passes = []1190        self.passes_awaiting_requeue = []1191        self.pass_queues = {c: [] for c in PassClassification}1192        self.known_programs = set()1193    def add_new_pass(self, run, classification=PassClassification.CANDIDATE):1194        """Creates a shrink pass corresponding to calling ``run(self)``"""1195        if isinstance(run, str):1196            run = getattr(Shrinker, run)1197        p = ShrinkPass(1198            pass_function=run, index=len(self.passes),1199            classification=classification,1200        )1201        if hasattr(run, 'command'):1202            self.known_programs.add(run.command)1203        self.passes.append(p)1204        self.passes_awaiting_requeue.append(p)1205        self.passes_by_name[p.name] = p1206        return p1207    def shrink_pass(self, name):1208        if hasattr(Shrinker, name) and name not in self.passes_by_name:1209            self.add_new_pass(name, classification=PassClassification.SPECIAL)1210        return self.passes_by_name[name]1211    def requeue_passes(self):1212        """Move all passes from passes_awaiting_requeue to their relevant1213        queues."""1214        while self.passes_awaiting_requeue:1215            p = self.passes_awaiting_requeue.pop()1216            heapq.heappush(self.pass_queues[p.classification], p)1217    def has_queued_passes(self, classification):1218        """Checks if any shrink passes are currently enqued under this1219        classification (note that there may be passes with this classification1220        currently awaiting requeue)."""1221        return len(self.pass_queues[classification]) > 01222    def pop_queued_pass(self, classification):1223        """Pop and run a single queued pass with this classification."""1224        sp = heapq.heappop(self.pass_queues[classification])1225        self.passes_awaiting_requeue.append(sp)1226        self.run_shrink_pass(sp)1227    def run_queued_until_change(self, classification):1228        """Run passes with this classification until there are no more or one1229        of them succeeds in shrinking the target."""1230        initial = self.shrink_target1231        while (1232            self.has_queued_passes(classification) and1233            self.shrink_target is initial1234        ):1235            self.pop_queued_pass(classification)1236        return self.shrink_target is not initial1237    def run_one_queued_pass(self, classification):1238        """Run a single queud pass with this classification (if there are1239        any)."""1240        if self.has_queued_passes(classification):1241            self.pop_queued_pass(classification)1242    def run_queued_passes(self, classification):1243        """Run all queued passes with this classification."""1244        while self.has_queued_passes(classification):1245            self.pop_queued_pass(classification)1246    @property1247    def calls(self):1248        return self.__engine.call_count1249    def consider_new_buffer(self, buffer):1250        buffer = hbytes(buffer)1251        return buffer.startswith(self.buffer) or \1252            self.incorporate_new_buffer(buffer)1253    def incorporate_new_buffer(self, buffer):1254        buffer = hbytes(buffer[:self.shrink_target.index])1255        try:1256            existing = self.__test_function_cache[buffer]1257        except KeyError:1258            pass1259        else:1260            return self.incorporate_test_data(existing)1261        # Sometimes an attempt at lexicographic minimization will do the wrong1262        # thing because the buffer has changed under it (e.g. something has1263        # turned into a write, the bit size has changed). The result would be1264        # an invalid string, but it's better for us to just ignore it here as1265        # it turns out to involve quite a lot of tricky book-keeping to get1266        # this right and it's better to just handle it in one place.1267        if sort_key(buffer) >= sort_key(self.shrink_target.buffer):1268            return False1269        if self.shrink_target.buffer.startswith(buffer):1270            return False1271        if not self.__engine.prescreen_buffer(buffer):1272            return False1273        assert sort_key(buffer) <= sort_key(self.shrink_target.buffer)1274        data = ConjectureData.for_buffer(buffer)1275        self.__engine.test_function(data)1276        self.__test_function_cache[buffer] = data1277        return self.incorporate_test_data(data)1278    def incorporate_test_data(self, data):1279        self.__test_function_cache[data.buffer] = data1280        if (1281            self.__predicate(data) and1282            sort_key(data.buffer) < sort_key(self.shrink_target.buffer)1283        ):1284            self.update_shrink_target(data)1285            self.__shrinking_block_cache = {}1286            return True1287        return False1288    def cached_test_function(self, buffer):1289        buffer = hbytes(buffer)1290        try:1291            return self.__test_function_cache[buffer]1292        except KeyError:1293            pass1294        result = self.__engine.cached_test_function(buffer)1295        self.incorporate_test_data(result)1296        self.__test_function_cache[buffer] = result1297        return result1298    def debug(self, msg):1299        self.__engine.debug(msg)1300    @property1301    def random(self):1302        return self.__engine.random1303    def run_shrink_pass(self, sp):1304        """Runs the function associated with ShrinkPass sp and updates the1305        relevant metadata.1306        Note that sp may or may not be a pass currently associated with1307        this shrinker. This does not handle any requeing that is1308        required.1309        """1310        if isinstance(sp, str):1311            sp = self.shrink_pass(sp)1312        self.debug('Shrink Pass %s' % (sp.name,))1313        initial_shrinks = self.shrinks1314        initial_calls = self.calls1315        size = len(self.shrink_target.buffer)1316        try:1317            sp.pass_function(self)1318        finally:1319            calls = self.calls - initial_calls1320            shrinks = self.shrinks - initial_shrinks1321            deletions = size - len(self.shrink_target.buffer)1322            sp.calls += calls1323            sp.shrinks += shrinks1324            sp.deletions += deletions1325            sp.runs += 11326            self.debug('Shrink Pass %s completed.' % (sp.name,))1327        # Complex state machine alert! A pass run can either succeed (we made1328        # at least one shrink) or fail (we didn't). This changes the pass's1329        # current classification according to the following possible1330        # transitions:1331        #1332        # CANDIDATE -------> HOPEFUL1333        #     |                 ^1334        #     |                 |1335        #     v                 v1336        #   AVOID ---------> DUBIOUS1337        #1338        # From best to worst we want to run HOPEFUL, CANDIDATE, DUBIOUS, AVOID.1339        # We will try any one of them if we have to but we want to prioritise.1340        #1341        # When a run succeeds, a pass will follow an arrow to a better class.1342        # When it fails, it will follow an arrow to a worse one.1343        # If no such arrow is available, it stays where it is.1344        #1345        # We also have the classification SPECIAL for passes that do not get1346        # run as part of the normal process.1347        previous = sp.classification1348        # If the pass didn't actually do anything we don't reclassify it. This1349        # is for things like remove_discarded which often are inapplicable.1350        if calls > 0 and sp.classification != PassClassification.SPECIAL:1351            if shrinks == 0:1352                if sp.successes > 0:1353                    sp.classification = PassClassification.DUBIOUS1354                else:1355                    sp.classification = PassClassification.AVOID1356            else:1357                sp.successes += 11358                if sp.classification == PassClassification.AVOID:1359                    sp.classification = PassClassification.DUBIOUS1360                else:1361                    sp.classification = PassClassification.HOPEFUL1362            if previous != sp.classification:1363                self.debug('Reclassified %s from %s to %s' % (1364                    sp.name, previous.name, sp.classification.name1365                ))1366    def shrink(self):1367        """Run the full set of shrinks and update shrink_target.1368        This method is "mostly idempotent" - calling it twice is unlikely to1369        have any effect, though it has a non-zero probability of doing so.1370        """1371        # We assume that if an all-zero block of bytes is an interesting1372        # example then we're not going to do better than that.1373        # This might not technically be true: e.g. for integers() | booleans()1374        # the simplest example is actually [1, 0]. Missing this case is fairly1375        # harmless and this allows us to make various simplifying assumptions1376        # about the structure of the data (principally that we're never1377        # operating on a block of all zero bytes so can use non-zeroness as a1378        # signpost of complexity).1379        if (1380            not any(self.shrink_target.buffer) or1381            self.incorporate_new_buffer(hbytes(len(self.shrink_target.buffer)))1382        ):1383            return1384        try:1385            self.greedy_shrink()1386        finally:1387            if self.__engine.report_debug_info:1388                def s(n):1389                    return 's' if n != 1 else ''1390                total_deleted = self.initial_size - len(1391                    self.shrink_target.buffer)1392                self.debug('---------------------')1393                self.debug('Shrink pass profiling')1394                self.debug('---------------------')1395                self.debug('')1396                calls = self.__engine.call_count - self.initial_calls1397                self.debug((1398                    'Shrinking made a total of %d call%s '1399                    'of which %d shrank. This deleted %d byte%s out of %d.'1400                ) % (1401                    calls, s(calls),1402                    self.shrinks,1403                    total_deleted, s(total_deleted),1404                    self.initial_size,1405                ))1406                for useful in [True, False]:1407                    self.debug('')1408                    if useful:1409                        self.debug('Useful passes:')1410                    else:1411                        self.debug('Useless passes:')1412                    self.debug('')1413                    for p in sorted(1414                        self.passes,1415                        key=lambda t: (1416                            -t.calls, -t.runs,1417                            t.deletions, t.shrinks,1418                        ),1419                    ):1420                        if p.calls == 0:1421                            continue1422                        if (p.shrinks != 0) != useful:1423                            continue1424                        self.debug((1425                            '  * %s ran %d time%s, making %d call%s of which '1426                            '%d shrank, deleting %d byte%s.'1427                        ) % (1428                            p.name,1429                            p.runs, s(p.runs),1430                            p.calls, s(p.calls),1431                            p.shrinks,1432                            p.deletions, s(p.deletions),1433                        ))1434                self.debug('')1435    def greedy_shrink(self):1436        """Run a full set of greedy shrinks (that is, ones that will only ever1437        move to a better target) and update shrink_target appropriately.1438        This method iterates to a fixed point and so is idempontent - calling1439        it twice will have exactly the same effect as calling it once.1440        """1441        self.run_shrink_pass('alphabet_minimize')1442        while self.single_greedy_shrink_iteration():1443            self.run_shrink_pass('lower_common_block_offset')1444    def single_greedy_shrink_iteration(self):1445        """Performs a single run through each greedy shrink pass, but does not1446        loop to achieve a fixed point."""1447        initial = self.shrink_target1448        # What follows is a slightly delicate dance. What we want to do is try1449        # to ensure that:1450        #1451        # 1. If it is possible for us to be deleting data, we should be.1452        # 2. We do not end up repeating a lot of passes uselessly.1453        # 3. We do not want to run expensive or useless passes if we can1454        #    possibly avoid doing so.1455        self.requeue_passes()1456        self.run_shrink_pass('remove_discarded')1457        # First run the entire set of solid passes (ones that have previously1458        # made changes). It's important that we run all of them, not just one,1459        # as typically each pass may unlock others.1460        self.run_queued_passes(PassClassification.HOPEFUL)1461        # While our solid passes are successfully shrinking the buffer, we can1462        # just keep doing that (note that this is a stronger condition than1463        # just making shrinks - it's a much better sense of progress. We can1464        # make only O(n) length reductions but we can make exponentially many1465        # shrinks).1466        if len(self.buffer) < len(initial.buffer):1467            return True1468        # If we're stuck on length reductions then we pull in one candiate pass1469        # (if there are any).1470        # This should hopefully help us unlock any local minima that were1471        # starting to reduce the utility of the previous solid passes.1472        self.run_one_queued_pass(PassClassification.CANDIDATE)1473        # We've pulled in a new candidate pass (or have no candidate passes1474        # left) and are making shrinks with the solid passes, so lets just1475        # keep on doing that.1476        if self.shrink_target is not initial:1477            return True1478        # We're a bit stuck, so it's time to try some new passes.1479        for classification in [1480            # First we try rerunning every pass we've previously seen succeed.1481            PassClassification.DUBIOUS,1482            # If that didn't work, we pull in some new candidate passes.1483            PassClassification.CANDIDATE,1484            # If that still didn't work, we now pull out all the stops and1485            # bring in the desperation passes. These are either passes that1486            # started as CANDIDATE but we have never seen work, or ones that1487            # are so expensive that they begin life as AVOID.1488            PassClassification.AVOID1489        ]:1490            if self.run_queued_until_change(classification):1491                return True1492        assert self.shrink_target is initial1493        return False1494    @property1495    def buffer(self):1496        return self.shrink_target.buffer1497    @property1498    def blocks(self):1499        return self.shrink_target.blocks1500    def pass_to_descendant(self):1501        """Attempt to replace each example with a descendant example.1502        This is designed to deal with strategies that call themselves1503        recursively. For example, suppose we had:1504        binary_tree = st.deferred(1505            lambda: st.one_of(1506                st.integers(), st.tuples(binary_tree, binary_tree)))1507        This pass guarantees that we can replace any binary tree with one of1508        its subtrees - each of those will create an interval that the parent1509        could validly be replaced with, and this pass will try doing that.1510        This is pretty expensive - it takes O(len(intervals)^2) - so we run it1511        late in the process when we've got the number of intervals as far down1512        as possible.1513        """1514        for ex in self.each_non_trivial_example():1515            st = self.shrink_target1516            descendants = sorted(set(1517                st.buffer[d.start:d.end] for d in self.shrink_target.examples1518                if d.start >= ex.start and d.end <= ex.end and1519                d.length < ex.length and d.label == ex.label1520            ), key=sort_key)1521            for d in descendants:1522                if self.incorporate_new_buffer(1523                    self.buffer[:ex.start] + d + self.buffer[ex.end:]1524                ):1525                    break1526    def is_shrinking_block(self, i):1527        """Checks whether block i has been previously marked as a shrinking1528        block.1529        If the shrink target has changed since i was last checked, will1530        attempt to calculate if an equivalent block in a previous shrink1531        target was marked as shrinking.1532        """1533        if not self.__shrinking_prefixes:1534            return False1535        try:1536            return self.__shrinking_block_cache[i]1537        except KeyError:1538            pass1539        t = self.shrink_target1540        return self.__shrinking_block_cache.setdefault(1541            i,1542            t.buffer[:t.blocks[i][0]] in self.__shrinking_prefixes1543        )1544    def is_payload_block(self, i):1545        """A block is payload if it is entirely non-structural: We can tinker1546        with its value freely and this will not affect the shape of the input1547        language.1548        This is mostly a useful concept when we're doing lexicographic1549        minimimization on multiple blocks at once - by restricting ourself to1550        payload blocks, we expect the shape of the language to not change1551        under us (but must still guard against it doing so).1552        """1553        return not (1554            self.is_shrinking_block(i) or1555            i in self.shrink_target.forced_blocks1556        )1557    def lower_common_block_offset(self):1558        """Sometimes we find ourselves in a situation where changes to one part1559        of the byte stream unlock changes to other parts. Sometimes this is1560        good, but sometimes this can cause us to exhibit exponential slow1561        downs!1562        e.g. suppose we had the following:1563        m = draw(integers(min_value=0))1564        n = draw(integers(min_value=0))1565        assert abs(m - n) > 11566        If this fails then we'll end up with a loop where on each iteration we1567        reduce each of m and n by 2 - m can't go lower because of n, then n1568        can't go lower because of m.1569        This will take us O(m) iterations to complete, which is exponential in1570        the data size, as we gradually zig zag our way towards zero.1571        This can only happen if we're failing to reduce the size of the byte1572        stream: The number of iterations that reduce the length of the byte1573        stream is bounded by that length.1574        So what we do is this: We keep track of which blocks are changing, and1575        then if there's some non-zero common offset to them we try and minimize1576        them all at once by lowering that offset.1577        This may not work, and it definitely won't get us out of all possible1578        exponential slow downs (an example of where it doesn't is where the1579        shape of the blocks changes as a result of this bouncing behaviour),1580        but it fails fast when it doesn't work and gets us out of a really1581        nastily slow case when it does.1582        """1583        if len(self.__changed_blocks) <= 1:1584            return1585        current = self.shrink_target1586        blocked = [current.buffer[u:v] for u, v in current.blocks]1587        changed = [1588            i for i in sorted(self.__changed_blocks)1589            if any(blocked[i]) and i not in self.shrink_target.forced_blocks1590        ]1591        if not changed:1592            return1593        ints = [int_from_bytes(blocked[i]) for i in changed]1594        offset = min(ints)1595        assert offset > 01596        for i in hrange(len(ints)):1597            ints[i] -= offset1598        def reoffset(o):1599            new_blocks = list(blocked)1600            for i, v in zip(changed, ints):1601                new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))1602            return self.incorporate_new_buffer(hbytes().join(new_blocks))1603        new_offset = Integer.shrink(offset, reoffset, random=self.random)1604        if new_offset == offset:1605            self.clear_change_tracking()1606    def shrink_offset_pairs(self):1607        """Lower any two blocks offset from each other the same ammount.1608        Before this shrink pass, two blocks explicitly offset from each1609        other would not get minimized properly:1610         >>> b = st.integers(0, 255)1611         >>> find(st.tuples(b, b), lambda x: x[0] == x[1] + 1)1612        (149,148)1613        This expensive (O(n^2)) pass goes through every pair of non-zero1614        blocks in the current shrink target and sees if the shrink1615        target can be improved by applying an offset to both of them.1616        """1617        current = [self.shrink_target.buffer[u:v] for u, v in self.blocks]1618        def int_from_block(i):1619            return int_from_bytes(current[i])1620        def block_len(i):1621            u, v = self.blocks[i]1622            return v - u1623        # Try reoffseting every pair1624        def reoffset_pair(pair, o):1625            n = len(self.blocks)1626            # Number of blocks may have changed, need to validate1627            valid_pair = [1628                p for p in pair if p < n and int_from_block(p) > 0 and1629                self.is_payload_block(p)1630            ]1631            if len(valid_pair) < 2:1632                return1633            m = min([int_from_block(p) for p in valid_pair])1634            new_blocks = [self.shrink_target.buffer[u:v]1635                          for u, v in self.blocks]1636            for i in valid_pair:1637                new_blocks[i] = int_to_bytes(1638                    int_from_block(i) + o - m, block_len(i))1639            buffer = hbytes().join(new_blocks)1640            return self.incorporate_new_buffer(buffer)1641        i = 01642        while i < len(self.blocks):1643            if self.is_payload_block(i) and int_from_block(i) > 0:1644                j = i + 11645                while j < len(self.shrink_target.blocks):1646                    block_val = int_from_block(j)1647                    i_block_val = int_from_block(i)1648                    if self.is_payload_block(j) \1649                       and block_val > 0 and i_block_val > 0:1650                        offset = min(int_from_block(i),1651                                     int_from_block(j))1652                        # Save current before shrinking1653                        current = [self.shrink_target.buffer[u:v]1654                                   for u, v in self.blocks]1655                        Integer.shrink(1656                            offset, lambda o: reoffset_pair((i, j), o),1657                            random=self.random1658                        )1659                    j += 11660            i += 11661    def mark_shrinking(self, blocks):1662        """Mark each of these blocks as a shrinking block: That is, lowering1663        its value lexicographically may cause less data to be drawn after."""1664        t = self.shrink_target1665        for i in blocks:1666            if self.__shrinking_block_cache.get(i) is True:1667                continue1668            self.__shrinking_block_cache[i] = True1669            prefix = t.buffer[:t.blocks[i][0]]1670            self.__shrinking_prefixes.add(prefix)1671    def clear_change_tracking(self):1672        self.__changed_blocks.clear()1673    def mark_changed(self, i):1674        self.__changed_blocks.add(i)1675    def update_shrink_target(self, new_target):1676        assert new_target.frozen1677        if self.shrink_target is not None:1678            current = self.shrink_target.buffer1679            new = new_target.buffer1680            assert sort_key(new) < sort_key(current)1681            self.shrinks += 11682            if new_target.blocks != self.shrink_target.blocks:1683                self.clear_change_tracking()1684            else:1685                for i, (u, v) in enumerate(self.shrink_target.blocks):1686                    if (1687                        i not in self.__changed_blocks and1688                        current[u:v] != new[u:v]1689                    ):1690                        self.mark_changed(i)1691        else:1692            self.__changed_blocks = set()1693        self.shrink_target = new_target1694        self.__shrinking_block_cache = {}1695    def try_shrinking_blocks(self, blocks, b):1696        """Attempts to replace each block in the blocks list with b. Returns1697        True if it succeeded (which may include some additional modifications1698        to shrink_target).1699        May call mark_shrinking with b if this causes a reduction in size.1700        In current usage it is expected that each of the blocks currently have1701        the same value, although this is not essential. Note that b must be1702        < the block at min(blocks) or this is not a valid shrink.1703        This method will attempt to do some small amount of work to delete data1704        that occurs after the end of the blocks. This is useful for cases where1705        there is some size dependency on the value of a block.1706        """1707        initial_attempt = bytearray(self.shrink_target.buffer)1708        for i, block in enumerate(blocks):1709            if block >= len(self.blocks):1710                blocks = blocks[:i]1711                break1712            u, v = self.blocks[block]1713            n = min(v - u, len(b))1714            initial_attempt[v - n:v] = b[-n:]1715        start = self.shrink_target.blocks[blocks[0]][0]1716        end = self.shrink_target.blocks[blocks[-1]][1]1717        initial_data = self.cached_test_function(initial_attempt)1718        if initial_data.status == Status.INTERESTING:1719            return initial_data is self.shrink_target1720        # If this produced something completely invalid we ditch it1721        # here rather than trying to persevere.1722        if initial_data.status < Status.VALID:1723            return False1724        # We've shrunk inside our group of blocks, so we have no way to1725        # continue. (This only happens when shrinking more than one block at1726        # a time).1727        if len(initial_data.buffer) < v:1728            return False1729        lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)1730        # If this did not in fact cause the data size to shrink we1731        # bail here because it's not worth trying to delete stuff from1732        # the remainder.1733        if lost_data <= 0:1734            return False1735        self.mark_shrinking(blocks)1736        # We now look for contiguous regions to delete that might help fix up1737        # this failed shrink. We only look for contiguous regions of the right1738        # lengths because doing anything more than that starts to get very1739        # expensive. See example_deletion_with_block_lowering for where we1740        # try to be more aggressive.1741        regions_to_delete = {(end, end + lost_data)}1742        for j in (blocks[-1] + 1, blocks[-1] + 2):1743            if j >= min(len(initial_data.blocks), len(self.blocks)):1744                continue1745            # We look for a block very shortly after the last one that has1746            # lost some of its size, and try to delete from the beginning so1747            # that it retains the same integer value. This is a bit of a hyper1748            # specific trick designed to make our integers() strategy shrink1749            # well.1750            r1, s1 = self.shrink_target.blocks[j]1751            r2, s2 = initial_data.blocks[j]1752            lost = (s1 - r1) - (s2 - r2)1753            # Apparently a coverage bug? An assert False in the body of this1754            # will reliably fail, but it shows up as uncovered.1755            if lost <= 0 or r1 != r2:  # pragma: no cover1756                continue1757            regions_to_delete.add((r1, r1 + lost))1758        for ex in self.shrink_target.examples:1759            if ex.start > start:1760                continue1761            if ex.end <= end:1762                continue1763            replacement = initial_data.examples[ex.index]1764            in_original = [1765                c for c in ex.children if c.start >= end1766            ]1767            in_replaced = [1768                c for c in replacement.children if c.start >= end1769            ]1770            if len(in_replaced) >= len(in_original) or not in_replaced:1771                continue1772            # We've found an example where some of the children went missing1773            # as a result of this change, and just replacing it with the data1774            # it would have had and removing the spillover didn't work. This1775            # means that some of its children towards the right must be1776            # important, so we try to arrange it so that it retains its1777            # rightmost children instead of its leftmost.1778            regions_to_delete.add((1779                in_original[0].start, in_original[-len(in_replaced)].start1780            ))1781        for u, v in sorted(1782            regions_to_delete, key=lambda x: x[1] - x[0], reverse=True1783        ):1784            try_with_deleted = bytearray(initial_attempt)1785            del try_with_deleted[u:v]1786            if self.incorporate_new_buffer(try_with_deleted):1787                return True1788        return False1789    def remove_discarded(self):1790        """Try removing all bytes marked as discarded.1791        This pass is primarily to deal with data that has been ignored while1792        doing rejection sampling - e.g. as a result of an integer range, or a1793        filtered strategy.1794        Such data will also be handled by the adaptive_example_deletion pass,1795        but that pass is necessarily more conservative and will try deleting1796        each interval individually. The common case is that all data drawn and1797        rejected can just be thrown away immediately in one block, so this pass1798        will be much faster than trying each one individually when it works.1799        """1800        while self.shrink_target.has_discards:1801            discarded = []1802            for ex in self.shrink_target.examples:1803                if ex.discarded and (1804                    not discarded or ex.start >= discarded[-1][-1]1805                ):1806                    discarded.append((ex.start, ex.end))1807            assert discarded1808            attempt = bytearray(self.shrink_target.buffer)1809            for u, v in reversed(discarded):1810                del attempt[u:v]1811            if not self.incorporate_new_buffer(attempt):1812                break1813    def each_non_trivial_example(self):1814        """Iterates over all non-trivial examples in the current shrink target,1815        with care taken to ensure that every example yielded is current.1816        Makes the assumption that no modifications will be made to the1817        shrink target prior to the currently yielded example. If this1818        assumption is violated this will probably raise an error, so1819        don't do that.1820        """1821        stack = [0]1822        while stack:1823            target = stack.pop()1824            if isinstance(target, tuple):1825                parent, i = target1826                parent = self.shrink_target.examples[parent]1827                example_index = parent.children[i].index1828            else:1829                example_index = target1830            ex = self.shrink_target.examples[example_index]1831            if ex.trivial:1832                continue1833            yield ex1834            ex = self.shrink_target.examples[example_index]1835            if ex.trivial:1836                continue1837            for i in range(len(ex.children)):1838                stack.append((example_index, i))1839    def example_wise_shrink(self, shrinker, **kwargs):1840        """Runs a sequence shrinker on the children of each example."""1841        for ex in self.each_non_trivial_example():1842            st = self.shrink_target1843            pieces = [1844                st.buffer[c.start:c.end]1845                for c in ex.children1846            ]1847            if not pieces:1848                pieces = [st.buffer[ex.start:ex.end]]1849            prefix = st.buffer[:ex.start]1850            suffix = st.buffer[ex.end:]1851            shrinker.shrink(1852                pieces, lambda ls: self.incorporate_new_buffer(1853                    prefix + hbytes().join(ls) + suffix,1854                ), random=self.random, **kwargs1855            )1856    def adaptive_example_deletion(self):1857        """Recursive deletion pass that tries to make the example located at1858        example_index as small as possible. This is the main point at which we1859        try to lower the size of the data.1860        First attempts to replace the example with its minimal possible version1861        using zero_example. If the example is trivial (either because of that1862        or because it was anyway) then we assume there's nothing we can1863        usefully do here and return early. Otherwise, we attempt to minimize it1864        by deleting its children.1865        If we do not make any successful changes, we recurse to the example's1866        children and attempt the same there.1867        """1868        self.example_wise_shrink(Length)1869    def zero_examples(self):1870        """Attempt to replace each example with a minimal version of itself."""1871        for ex in self.each_non_trivial_example():1872            u = ex.start1873            v = ex.end1874            attempt = self.cached_test_function(1875                self.buffer[:u] + hbytes(v - u) + self.buffer[v:]1876            )1877            # FIXME: IOU one attempt to debug this - DRMacIver1878            # This is a mysterious problem that should be impossible to trigger1879            # but isn't. I don't know what's going on, and it defeated my1880            # my attempts to reproduce or debug it. I'd *guess* it's related to1881            # nondeterminism in the test function. That should be impossible in1882            # the cases where I'm seeing it, but I haven't been able to put1883            # together a reliable reproduction of it.1884            if ex.index >= len(attempt.examples):  # pragma: no cover1885                continue1886            in_replacement = attempt.examples[ex.index]1887            used = in_replacement.length1888            if (1889                not self.__predicate(attempt) and1890                in_replacement.end < len(attempt.buffer) and1891                used < ex.length1892            ):1893                self.incorporate_new_buffer(1894                    self.buffer[:u] + hbytes(used) + self.buffer[v:]1895                )1896    def minimize_duplicated_blocks(self):1897        """Find blocks that have been duplicated in multiple places and attempt1898        to minimize all of the duplicates simultaneously.1899        This lets us handle cases where two values can't be shrunk1900        independently of each other but can easily be shrunk together.1901        For example if we had something like:1902        ls = data.draw(lists(integers()))1903        y = data.draw(integers())1904        assert y not in ls1905        Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were1906        to replace both 3s with 0, this would be a valid shrink, but if we were1907        to replace either 3 with 0 on its own the test would start passing.1908        It is also useful for when that duplication is accidental and the value1909        of the blocks doesn't matter very much because it allows us to replace1910        more values at once.1911        """1912        def canon(b):1913            i = 01914            while i < len(b) and b[i] == 0:1915                i += 11916            return b[i:]1917        counts = Counter(1918            canon(self.shrink_target.buffer[u:v])1919            for u, v in self.blocks1920        )1921        counts.pop(hbytes(), None)1922        blocks = [buffer for buffer, count in counts.items() if count > 1]1923        blocks.sort(reverse=True)1924        blocks.sort(key=lambda b: counts[b] * len(b), reverse=True)1925        for block in blocks:1926            targets = [1927                i for i, (u, v) in enumerate(self.blocks)1928                if canon(self.shrink_target.buffer[u:v]) == block1929            ]1930            # This can happen if some blocks have been lost in the previous1931            # shrinking.1932            if len(targets) <= 1:1933                continue1934            Lexical.shrink(1935                block,1936                lambda b: self.try_shrinking_blocks(targets, b),1937                random=self.random, full=False1938            )1939    def minimize_individual_blocks(self):1940        """Attempt to minimize each block in sequence.1941        This is the pass that ensures that e.g. each integer we draw is a1942        minimum value. So it's the part that guarantees that if we e.g. do1943        x = data.draw(integers())1944        assert x < 101945        then in our shrunk example, x = 10 rather than say 97.1946        """1947        i = len(self.blocks) - 11948        while i >= 0:1949            u, v = self.blocks[i]1950            Lexical.shrink(1951                self.shrink_target.buffer[u:v],1952                lambda b: self.try_shrinking_blocks((i,), b),1953                random=self.random, full=False,1954            )1955            i -= 11956    def example_deletion_with_block_lowering(self):1957        """Sometimes we get stuck where there is data that we could easily1958        delete, but it changes the number of examples generated, so we have to1959        change that at the same time.1960        We handle most of the common cases in try_shrinking_blocks which is1961        pretty good at clearing out large contiguous blocks of dead space,1962        but it fails when there is data that has to stay in particular places1963        in the list.1964        This pass exists as an emergency procedure to get us unstuck. For every1965        example and every block not inside that example it tries deleting the1966        example and modifying the block's value by one in either direction.1967        """1968        i = 01969        while i < len(self.shrink_target.blocks):1970            if not self.is_shrinking_block(i):1971                i += 11972                continue1973            u, v = self.blocks[i]1974            j = 01975            while j < len(self.shrink_target.examples):1976                n = int_from_bytes(self.shrink_target.buffer[u:v])1977                if n == 0:1978                    break1979                ex = self.shrink_target.examples[j]1980                if ex.start < v or ex.length == 0:1981                    j += 11982                    continue1983                buf = bytearray(self.shrink_target.buffer)1984                buf[u:v] = int_to_bytes(n - 1, v - u)1985                del buf[ex.start:ex.end]1986                if not self.incorporate_new_buffer(buf):1987                    j += 11988            i += 11989    def minimize_block_pairs_retaining_sum(self):1990        """This pass minimizes pairs of blocks subject to the constraint that1991        their sum when interpreted as integers remains the same. This allow us1992        to normalize a number of examples that we would otherwise struggle on.1993        e.g. consider the following:1994        m = data.draw_bits(8)1995        n = data.draw_bits(8)1996        if m + n >= 256:1997            data.mark_interesting()1998        The ideal example for this is m=1, n=255, but we will almost never1999        find that without a pass like this - we would only do so if we2000        happened to draw n=255 by chance.2001        This kind of scenario comes up reasonably often in the context of e.g.2002        triggering overflow behaviour.2003        """2004        i = 02005        while i < len(self.shrink_target.blocks):2006            if self.is_payload_block(i):2007                j = i + 12008                while j < len(self.shrink_target.blocks):2009                    u, v = self.shrink_target.blocks[i]2010                    m = int_from_bytes(self.shrink_target.buffer[u:v])2011                    if m == 0:2012                        break2013                    r, s = self.shrink_target.blocks[j]2014                    n = int_from_bytes(self.shrink_target.buffer[r:s])2015                    if (2016                        s - r == v - u and2017                        self.is_payload_block(j)2018                    ):2019                        def trial(x, y):2020                            if s > len(self.shrink_target.buffer):2021                                return False2022                            attempt = bytearray(self.shrink_target.buffer)2023                            try:2024                                attempt[u:v] = int_to_bytes(x, v - u)2025                                attempt[r:s] = int_to_bytes(y, s - r)2026                            except OverflowError:2027                                return False2028                            return self.incorporate_new_buffer(attempt)2029                        # We first attempt to move 1 from m to n. If that works2030                        # then we treat that as a sign that it's worth trying2031                        # a more expensive minimization. But if m was already 12032                        # (we know it's > 0) then there's no point continuing2033                        # because the value there is now zero.2034                        if trial(m - 1, n + 1) and m > 1:2035                            m = int_from_bytes(self.shrink_target.buffer[u:v])2036                            n = int_from_bytes(self.shrink_target.buffer[r:s])2037                            tot = m + n2038                            Integer.shrink(2039                                m, lambda x: trial(x, tot - x),2040                                random=self.random2041                            )2042                    j += 12043            i += 12044    def reorder_examples(self):2045        """This pass allows us to reorder pairs of examples which come from the2046        same strategy (or strategies that happen to pun to the same label by2047        accident, but that shouldn't happen often).2048        For example, consider the following:2049        .. code-block:: python2050            import hypothesis.strategies as st2051            from hypothesis import given2052            @given(st.text(), st.text())2053            def test_does_not_exceed_100(x, y):2054                assert x != y2055        Without the ability to reorder x and y this could fail either with2056        ``x="", ``y="0"``, or the other way around. With reordering it will2057        reliably fail with ``x=""``, ``y="0"``.2058        """2059        self.example_wise_shrink(Ordering, key=sort_key)2060    def alphabet_minimize(self):2061        """Attempts to replace most bytes in the buffer with 0 or 1. The main2062        benefit of this is that it significantly increases our cache hit rate2063        by making things that are equivalent more likely to have the same2064        representation.2065        We only run this once rather than as part of the main passes as2066        once it's done its magic it's unlikely to ever be useful again.2067        It's important that it runs first though, because it makes2068        everything that comes after it faster because of the cache hits.2069        """2070        for c in (1, 0):2071            alphabet = set(self.buffer) - set(hrange(c + 1))2072            if not alphabet:2073                continue2074            def clear_to(reduced):2075                reduced = set(reduced)2076                attempt = hbytes([2077                    b if b <= c or b in reduced else c2078                    for b in self.buffer2079                ])2080                return self.consider_new_buffer(attempt)2081            Length.shrink(2082                sorted(alphabet), clear_to,2083                random=self.random,...shrinker.py
Source:shrinker.py  
...650        self.__all_changed_blocks = set()651    def mark_changed(self, i):652        self.__changed_blocks.add(i)653    @property654    def __changed_blocks(self):655        if self.__last_checked_changed_at is not self.shrink_target:656            prev_target = self.__last_checked_changed_at657            new_target = self.shrink_target658            assert prev_target is not new_target659            prev = prev_target.buffer660            new = new_target.buffer661            assert sort_key(new) < sort_key(prev)662            if (663                len(new_target.blocks) != len(prev_target.blocks)664                or new_target.blocks.endpoints != prev_target.blocks.endpoints665            ):666                self.__all_changed_blocks = set()667            else:668                blocks = new_target.blocks...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
