Best Python code snippet using hypothesis
engine.py
Source:engine.py  
...83        # is at the beginning of a block of size 4 but only has 3 bytes left,84        # it's going to overrun the end of the buffer regardless of the85        # buffer contents.86        self.block_sizes = {}87    def __tree_is_exhausted(self):88        return 0 in self.dead89    def new_buffer(self):90        assert not self.__tree_is_exhausted()91        def draw_bytes(data, n, distribution):92            return self.__rewrite_for_novelty(93                data, self.__zero_bound(data, distribution(self.random, n))94            )95        self.last_data = ConjectureData(96            max_length=self.settings.buffer_size,97            draw_bytes=draw_bytes98        )99        self.test_function(self.last_data)100        self.last_data.freeze()101    def test_function(self, data):102        self.call_count += 1103        try:104            self._test_function(data)105            data.freeze()106        except StopTest as e:107            if e.testcounter != data.testcounter:108                self.save_buffer(data.buffer)109                raise e110        except:111            self.save_buffer(data.buffer)112            raise113        finally:114            data.freeze()115            self.note_details(data)116        self.debug_data(data)117        if data.status >= Status.VALID:118            self.valid_examples += 1119        tree_node = self.tree[0]120        indices = []121        node_index = 0122        for i, b in enumerate(data.buffer):123            indices.append(node_index)124            if i in data.forced_indices:125                self.forced[node_index] = b126            try:127                self.capped[node_index] = data.capped_indices[i]128            except KeyError:129                pass130            try:131                node_index = tree_node[b]132            except KeyError:133                node_index = len(self.tree)134                self.tree.append({})135                tree_node[b] = node_index136            tree_node = self.tree[node_index]137            if node_index in self.dead:138                break139        for u, v in data.blocks:140            # This can happen if we hit a dead node when walking the buffer.141            # In that case we alrady have this section of the tree mapped.142            if u >= len(indices):143                break144            self.block_sizes[indices[u]] = v - u145        if data.status != Status.OVERRUN and node_index not in self.dead:146            self.dead.add(node_index)147            self.tree[node_index] = data148            for j in reversed(indices):149                if (150                    len(self.tree[j]) < self.capped.get(j, 255) + 1 and151                    j not in self.forced152                ):153                    break154                if set(self.tree[j].values()).issubset(self.dead):155                    self.dead.add(j)156                else:157                    break158        last_data_is_interesting = (159            self.last_data is not None and160            self.last_data.status == Status.INTERESTING161        )162        if (163            data.status == Status.INTERESTING and (164                not last_data_is_interesting or165                sort_key(data.buffer) < sort_key(self.last_data.buffer)166            )167        ):168            self.last_data = data169            if last_data_is_interesting:170                self.shrinks += 1171                if self.shrinks >= self.settings.max_shrinks:172                    self.exit_reason = ExitReason.max_shrinks173                    raise RunIsComplete()174    def consider_new_test_data(self, data):175        # Transition rules:176        #   1. Transition cannot decrease the status177        #   2. Any transition which increases the status is valid178        #   3. If the previous status was interesting, only shrinking179        #      transitions are allowed.180        if data.buffer == self.last_data.buffer:181            return False182        if self.last_data.status < data.status:183            return True184        if self.last_data.status > data.status:185            return False186        if data.status == Status.INVALID:187            return data.index >= self.last_data.index188        if data.status == Status.OVERRUN:189            return data.overdraw <= self.last_data.overdraw190        assert data.status == Status.VALID191        return True192    def save_buffer(self, buffer):193        if (194            self.settings.database is not None and195            self.database_key is not None196        ):197            self.settings.database.save(self.database_key, hbytes(buffer))198    def note_details(self, data):199        if data.status == Status.INTERESTING:200            self.save_buffer(data.buffer)201        runtime = max(data.finish_time - data.start_time, 0.0)202        self.status_runtimes.setdefault(data.status, []).append(runtime)203        for event in set(map(self.event_to_string, data.events)):204            self.event_call_counts[event] += 1205    def debug(self, message):206        with self.settings:207            debug_report(message)208    def debug_data(self, data):209        buffer_parts = [u"["]210        for i, (u, v) in enumerate(data.blocks):211            if i > 0:212                buffer_parts.append(u" || ")213            buffer_parts.append(214                u', '.join(int_to_text(int(i)) for i in data.buffer[u:v]))215        buffer_parts.append(u']')216        self.debug(u'%d bytes %s -> %s, %s' % (217            data.index,218            u''.join(buffer_parts),219            unicode_safe_repr(data.status),220            data.output,221        ))222    def prescreen_buffer(self, buffer):223        """Attempt to rule out buffer as a possible interesting candidate.224        Returns False if we know for sure that running this buffer will not225        produce an interesting result. Returns True if it might (because it226        explores territory we have not previously tried).227        This is purely an optimisation to try to reduce the number of tests we228        run. "return True" would be a valid but inefficient implementation.229        """230        node_index = 0231        n = len(buffer)232        for k, b in enumerate(buffer):233            if node_index in self.dead:234                return False235            try:236                # The block size at that point provides a lower bound on how237                # many more bytes are required. If the buffer does not have238                # enough bytes to fulfill that block size then we can rule out239                # this buffer.240                if k + self.block_sizes[node_index] > n:241                    return False242            except KeyError:243                pass244            try:245                b = self.forced[node_index]246            except KeyError:247                pass248            try:249                b = min(b, self.capped[node_index])250            except KeyError:251                pass252            try:253                node_index = self.tree[node_index][b]254            except KeyError:255                return True256        else:257            return False258    def incorporate_new_buffer(self, buffer):259        assert self.last_data.status == Status.INTERESTING260        if (261            self.settings.timeout > 0 and262            time.time() >= self.start_time + self.settings.timeout263        ):264            self.exit_reason = ExitReason.timeout265            raise RunIsComplete()266        buffer = hbytes(buffer[:self.last_data.index])267        assert sort_key(buffer) < sort_key(self.last_data.buffer)268        if not self.prescreen_buffer(buffer):269            return False270        assert sort_key(buffer) <= sort_key(self.last_data.buffer)271        data = ConjectureData.for_buffer(buffer)272        self.test_function(data)273        return data is self.last_data274    def run(self):275        with self.settings:276            try:277                self._run()278            except RunIsComplete:279                pass280            self.debug(281                u'Run complete after %d examples (%d valid) and %d shrinks' % (282                    self.call_count, self.valid_examples, self.shrinks,283                ))284    def _new_mutator(self):285        def draw_new(data, n, distribution):286            return distribution(self.random, n)287        def draw_existing(data, n, distribution):288            return self.last_data.buffer[data.index:data.index + n]289        def draw_smaller(data, n, distribution):290            existing = self.last_data.buffer[data.index:data.index + n]291            r = distribution(self.random, n)292            if r <= existing:293                return r294            return _draw_predecessor(self.random, existing)295        def draw_larger(data, n, distribution):296            existing = self.last_data.buffer[data.index:data.index + n]297            r = distribution(self.random, n)298            if r >= existing:299                return r300            return _draw_successor(self.random, existing)301        def reuse_existing(data, n, distribution):302            choices = data.block_starts.get(n, []) or \303                self.last_data.block_starts.get(n, [])304            if choices:305                i = self.random.choice(choices)306                return self.last_data.buffer[i:i + n]307            else:308                result = distribution(self.random, n)309                assert isinstance(result, hbytes)310                return result311        def flip_bit(data, n, distribution):312            buf = bytearray(313                self.last_data.buffer[data.index:data.index + n])314            i = self.random.randint(0, n - 1)315            k = self.random.randint(0, 7)316            buf[i] ^= (1 << k)317            return hbytes(buf)318        def draw_zero(data, n, distribution):319            return hbytes(b'\0' * n)320        def draw_max(data, n, distribution):321            return hbytes([255]) * n322        def draw_constant(data, n, distribution):323            return bytes_from_list([324                self.random.randint(0, 255)325            ] * n)326        options = [327            draw_new,328            reuse_existing, reuse_existing,329            draw_existing, draw_smaller, draw_larger,330            flip_bit,331            draw_zero, draw_max, draw_zero, draw_max,332            draw_constant,333        ]334        bits = [335            self.random.choice(options) for _ in hrange(3)336        ]337        def draw_mutated(data, n, distribution):338            if (339                data.index + n > len(self.last_data.buffer)340            ):341                result = distribution(self.random, n)342            else:343                result = self.random.choice(bits)(data, n, distribution)344            return self.__rewrite_for_novelty(345                data, self.__zero_bound(data, result))346        return draw_mutated347    def __rewrite(self, data, result):348        return self.__rewrite_for_novelty(349            data, self.__zero_bound(data, result)350        )351    def __zero_bound(self, data, result):352        """This tries to get the size of the generated data under control by353        replacing the result with zero if we are too deep or have already354        generated too much data.355        This causes us to enter "shrinking mode" there and thus reduce356        the size of the generated data.357        """358        if (359            data.depth * 2 >= MAX_DEPTH or360            (data.index + len(result)) * 2 >= self.settings.buffer_size361        ):362            if any(result):363                data.hit_zero_bound = True364            return hbytes(len(result))365        else:366            return result367    def __rewrite_for_novelty(self, data, result):368        """Take a block that is about to be added to data as the result of a369        draw_bytes call and rewrite it a small amount to ensure that the result370        will be novel: that is, not hit a part of the tree that we have fully371        explored.372        This is mostly useful for test functions which draw a small373        number of blocks.374        """375        assert isinstance(result, hbytes)376        try:377            node_index = data.__current_node_index378        except AttributeError:379            node_index = 0380            data.__current_node_index = node_index381            data.__hit_novelty = False382            data.__evaluated_to = 0383        if data.__hit_novelty:384            return result385        node = self.tree[node_index]386        for i in hrange(data.__evaluated_to, len(data.buffer)):387            node = self.tree[node_index]388            try:389                node_index = node[data.buffer[i]]390                assert node_index not in self.dead391                node = self.tree[node_index]392            except KeyError:393                data.__hit_novelty = True394                return result395        for i, b in enumerate(result):396            assert isinstance(b, int)397            try:398                new_node_index = node[b]399            except KeyError:400                data.__hit_novelty = True401                return result402            new_node = self.tree[new_node_index]403            if new_node_index in self.dead:404                if isinstance(result, hbytes):405                    result = bytearray(result)406                for c in range(256):407                    if c not in node:408                        assert c <= self.capped.get(node_index, c)409                        result[i] = c410                        data.__hit_novelty = True411                        return hbytes(result)412                    else:413                        new_node_index = node[c]414                        new_node = self.tree[new_node_index]415                        if new_node_index not in self.dead:416                            result[i] = c417                            break418                else:  # pragma: no cover419                    assert False, (420                        'Found a tree node which is live despite all its '421                        'children being dead.')422            node_index = new_node_index423            node = new_node424        assert node_index not in self.dead425        data.__current_node_index = node_index426        data.__evaluated_to = data.index + len(result)427        return hbytes(result)428    def has_existing_examples(self):429        return (430            self.settings.database is not None and431            self.database_key is not None and432            Phase.reuse in self.settings.phases433        )434    def reuse_existing_examples(self):435        """If appropriate (we have a database and have been told to use it),436        try to reload existing examples from the database.437        If there are a lot we don't try all of them. We always try the438        smallest example in the database (which is guaranteed to be the439        last failure) and the largest (which is usually the seed example440        which the last failure came from but we don't enforce that). We441        then take a random sampling of the remainder and try those. Any442        examples that are no longer interesting are cleared out.443        """444        if self.has_existing_examples():445            corpus = sorted(446                self.settings.database.fetch(self.database_key),447                key=sort_key448            )449            desired_size = max(2, ceil(0.1 * self.settings.max_examples))450            if desired_size < len(corpus):451                new_corpus = [corpus[0], corpus[-1]]452                n_boost = max(desired_size - 2, 0)453                new_corpus.extend(self.random.sample(corpus[1:-1], n_boost))454                corpus = new_corpus455                corpus.sort(key=sort_key)456            for existing in corpus:457                if self.valid_examples >= self.settings.max_examples:458                    self.exit_with(ExitReason.max_examples)459                if self.call_count >= max(460                    self.settings.max_iterations, self.settings.max_examples461                ):462                    self.exit_with(ExitReason.max_iterations)463                data = ConjectureData.for_buffer(existing)464                self.test_function(data)465                data.freeze()466                self.last_data = data467                self.consider_new_test_data(data)468                if data.status == Status.INTERESTING:469                    assert data.status == Status.INTERESTING470                    self.last_data = data471                    break472                else:473                    self.settings.database.delete(474                        self.database_key, existing)475    def exit_with(self, reason):476        self.exit_reason = reason477        raise RunIsComplete()478    def _run(self):479        self.last_data = None480        mutations = 0481        start_time = time.time()482        self.reuse_existing_examples()483        if (484            Phase.generate in self.settings.phases and not485            self.__tree_is_exhausted()486        ):487            if (488                self.last_data is None or489                self.last_data.status < Status.INTERESTING490            ):491                self.new_buffer()492            mutator = self._new_mutator()493            zero_bound_queue = []494            while (495                self.last_data.status != Status.INTERESTING and496                not self.__tree_is_exhausted()497            ):498                if self.valid_examples >= self.settings.max_examples:499                    self.exit_reason = ExitReason.max_examples500                    return501                if self.call_count >= max(502                    self.settings.max_iterations, self.settings.max_examples503                ):504                    self.exit_reason = ExitReason.max_iterations505                    return506                if (507                    self.settings.timeout > 0 and508                    time.time() >= start_time + self.settings.timeout509                ):510                    self.exit_reason = ExitReason.timeout511                    return512                if zero_bound_queue:513                    # Whenever we generated an example and it hits a bound514                    # which forces zero blocks into it, this creates a weird515                    # distortion effect by making certain parts of the data516                    # stream (especially ones to the right) much more likely517                    # to be zero. We fix this by redistributing the generated518                    # data by shuffling it randomly. This results in the519                    # zero data being spread evenly throughout the buffer.520                    # Hopefully the shrinking this causes will cause us to521                    # naturally fail to hit the bound.522                    # If it doesn't then we will queue the new version up again523                    # (now with more zeros) and try again.524                    overdrawn = zero_bound_queue.pop()525                    buffer = bytearray(overdrawn.buffer)526                    # These will have values written to them that are different527                    # from what's in them anyway, so the value there doesn't528                    # really "count" for distributional purposes, and if we529                    # leave them in then they can cause the fraction of non530                    # zero bytes to increase on redraw instead of decrease.531                    for i in overdrawn.forced_indices:532                        buffer[i] = 0533                    self.random.shuffle(buffer)534                    buffer = hbytes(buffer)535                    if buffer == overdrawn.buffer:536                        continue537                    def draw_bytes(data, n, distribution):538                        result = buffer[data.index:data.index + n]539                        if len(result) < n:540                            result += hbytes(n - len(result))541                        return self.__rewrite(data, result)542                    data = ConjectureData(543                        draw_bytes=draw_bytes,544                        max_length=self.settings.buffer_size,545                    )546                    self.test_function(data)547                    data.freeze()548                elif mutations >= self.settings.max_mutations:549                    mutations = 0550                    data = self.new_buffer()551                    mutator = self._new_mutator()552                else:553                    data = ConjectureData(554                        draw_bytes=mutator,555                        max_length=self.settings.buffer_size556                    )557                    self.test_function(data)558                    data.freeze()559                    prev_data = self.last_data560                    if self.consider_new_test_data(data):561                        self.last_data = data562                        if data.status > prev_data.status:563                            mutations = 0564                    else:565                        mutator = self._new_mutator()566                if getattr(data, 'hit_zero_bound', False):567                    zero_bound_queue.append(data)568                mutations += 1569        if self.__tree_is_exhausted():570            self.exit_reason = ExitReason.finished571            return572        data = self.last_data573        if data is None:574            self.exit_reason = ExitReason.finished575            return576        assert isinstance(data.output, text_type)577        if self.settings.max_shrinks <= 0:578            self.exit_reason = ExitReason.max_shrinks579            return580        if Phase.shrink not in self.settings.phases:581            self.exit_reason = ExitReason.finished582            return583        data = ConjectureData.for_buffer(self.last_data.buffer)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
