Best Python code snippet using hypothesis
data.py
Source:data.py  
...536class DataObserver:537    """Observer class for recording the behaviour of a538    ConjectureData object, primarily used for tracking539    the behaviour in the tree cache."""540    def conclude_test(self, status, interesting_origin):541        """Called when ``conclude_test`` is called on the542        observed ``ConjectureData``, with the same arguments.543        Note that this is called after ``freeze`` has completed.544        """545    def draw_bits(self, n_bits, forced, value):546        """Called when ``draw_bits`` is called on on the547        observed ``ConjectureData``.548        * ``n_bits`` is the number of bits drawn.549        *  ``forced`` is True if the corresponding550           draw was forced or ``False`` otherwise.551        * ``value`` is the result that ``draw_bits`` returned.552        """553    def kill_branch(self):554        """Mark this part of the tree as not worth re-exploring."""555@attr.s(slots=True)556class ConjectureResult:557    """Result class storing the parts of ConjectureData that we558    will care about after the original ConjectureData has outlived its559    usefulness."""560    status = attr.ib()561    interesting_origin = attr.ib()562    buffer = attr.ib()563    blocks = attr.ib()564    output = attr.ib()565    extra_information = attr.ib()566    has_discards = attr.ib()567    target_observations = attr.ib()568    tags = attr.ib()569    forced_indices = attr.ib(repr=False)570    examples = attr.ib(repr=False)571    index = attr.ib(init=False)572    def __attrs_post_init__(self):573        self.index = len(self.buffer)574        self.forced_indices = frozenset(self.forced_indices)575    def as_result(self):576        return self577# Masks for masking off the first byte of an n-bit buffer.578# The appropriate mask is stored at position n % 8.579BYTE_MASKS = [(1 << n) - 1 for n in range(8)]580BYTE_MASKS[0] = 255581class ConjectureData:582    @classmethod583    def for_buffer(self, buffer, observer=None):584        return ConjectureData(585            prefix=buffer, max_length=len(buffer), random=None, observer=observer,586        )587    def __init__(self, max_length, prefix, random, observer=None):588        if observer is None:589            observer = DataObserver()590        assert isinstance(observer, DataObserver)591        self.__bytes_drawn = 0592        self.observer = observer593        self.max_length = max_length594        self.is_find = False595        self.overdraw = 0596        self.__block_starts = defaultdict(list)597        self.__block_starts_calculated_to = 0598        self.__prefix = prefix599        self.__random = random600        assert random is not None or max_length <= len(prefix)601        self.blocks = Blocks(self)602        self.buffer = bytearray()603        self.index = 0604        self.output = ""605        self.status = Status.VALID606        self.frozen = False607        global global_test_counter608        self.testcounter = global_test_counter609        global_test_counter += 1610        self.start_time = benchmark_time()611        self.events = set()612        self.forced_indices = set()613        self.interesting_origin = None614        self.draw_times = []615        self.max_depth = 0616        self.has_discards = False617        self.__result = None618        # Observations used for targeted search.  They'll be aggregated in619        # ConjectureRunner.generate_new_examples and fed to TargetSelector.620        self.target_observations = {}621        # Tags which indicate something about which part of the search space622        # this example is in. These are used to guide generation.623        self.tags = set()624        self.labels_for_structure_stack = []625        # Normally unpopulated but we need this in the niche case626        # that self.as_result() is Overrun but we still want the627        # examples for reporting purposes.628        self.__examples = None629        # We want the top level example to have depth 0, so we start630        # at -1.631        self.depth = -1632        self.__example_record = ExampleRecord()633        self.extra_information = ExtraInformation()634        self.start_example(TOP_LABEL)635    def __repr__(self):636        return "ConjectureData(%s, %d bytes%s)" % (637            self.status.name,638            len(self.buffer),639            ", frozen" if self.frozen else "",640        )641    def as_result(self):642        """Convert the result of running this test into643        either an Overrun object or a ConjectureResult."""644        assert self.frozen645        if self.status == Status.OVERRUN:646            return Overrun647        if self.__result is None:648            self.__result = ConjectureResult(649                status=self.status,650                interesting_origin=self.interesting_origin,651                buffer=self.buffer,652                examples=self.examples,653                blocks=self.blocks,654                output=self.output,655                extra_information=self.extra_information656                if self.extra_information.has_information()657                else None,658                has_discards=self.has_discards,659                target_observations=self.target_observations,660                tags=frozenset(self.tags),661                forced_indices=self.forced_indices,662            )663            self.blocks.transfer_ownership(self.__result)664        return self.__result665    def __assert_not_frozen(self, name):666        if self.frozen:667            raise Frozen("Cannot call %s on frozen ConjectureData" % (name,))668    def note(self, value):669        self.__assert_not_frozen("note")670        if not isinstance(value, str):671            value = repr(value)672        self.output += value673    def draw(self, strategy, label=None):674        if self.is_find and not strategy.supports_find:675            raise InvalidArgument(676                (677                    "Cannot use strategy %r within a call to find (presumably "678                    "because it would be invalid after the call had ended)."679                )680                % (strategy,)681            )682        at_top_level = self.depth == 0683        if at_top_level:684            # We start this timer early, because accessing attributes on a LazyStrategy685            # can be almost arbitrarily slow.  In cases like characters() and text()686            # where we cache something expensive, this led to Flaky deadline errors!687            # See https://github.com/HypothesisWorks/hypothesis/issues/2108688            start_time = benchmark_time()689        strategy.validate()690        if strategy.is_empty:691            self.mark_invalid()692        if self.depth >= MAX_DEPTH:693            self.mark_invalid()694        if label is None:695            label = strategy.label696        self.start_example(label=label)697        try:698            if not at_top_level:699                return strategy.do_draw(self)700            else:701                try:702                    strategy.validate()703                    try:704                        return strategy.do_draw(self)705                    finally:706                        self.draw_times.append(benchmark_time() - start_time)707                except BaseException as e:708                    mark_for_escalation(e)709                    raise710        finally:711            self.stop_example()712    def start_example(self, label):713        self.__assert_not_frozen("start_example")714        self.depth += 1715        # Logically it would make sense for this to just be716        # ``self.depth = max(self.depth, self.max_depth)``, which is what it used to717        # be until we ran the code under tracemalloc and found a rather significant718        # chunk of allocation was happening here. This was presumably due to varargs719        # or the like, but we didn't investigate further given that it was easy720        # to fix with this check.721        if self.depth > self.max_depth:722            self.max_depth = self.depth723        self.__example_record.start_example(label)724        self.labels_for_structure_stack.append({label})725    def stop_example(self, discard=False):726        if self.frozen:727            return728        if discard:729            self.has_discards = True730        self.depth -= 1731        assert self.depth >= -1732        self.__example_record.stop_example(discard)733        labels_for_structure = self.labels_for_structure_stack.pop()734        if not discard:735            if self.labels_for_structure_stack:736                self.labels_for_structure_stack[-1].update(labels_for_structure)737            else:738                self.tags.update([structural_coverage(l) for l in labels_for_structure])739        if discard:740            # Once we've discarded an example, every test case starting with741            # this prefix contains discards. We prune the tree at that point so742            # as to avoid future test cases bothering with this region, on the743            # assumption that some example that you could have used instead744            # there would *not* trigger the discard. This greatly speeds up745            # test case generation in some cases, because it allows us to746            # ignore large swathes of the search space that are effectively747            # redundant.748            #749            # A scenario that can cause us problems but which we deliberately750            # have decided not to support is that if there are side effects751            # during data generation then you may end up with a scenario where752            # every good test case generates a discard because the discarded753            # section sets up important things for later. This is not terribly754            # likely and all that you see in this case is some degradation in755            # quality of testing, so we don't worry about it.756            #757            # Note that killing the branch does *not* mean we will never758            # explore below this point, and in particular we may do so during759            # shrinking. Any explicit request for a data object that starts760            # with the branch here will work just fine, but novel prefix761            # generation will avoid it, and we can use it to detect when we762            # have explored the entire tree (up to redundancy).763            self.observer.kill_branch()764    def note_event(self, event):765        self.events.add(event)766    @property767    def examples(self):768        assert self.frozen769        if self.__examples is None:770            self.__examples = Examples(record=self.__example_record, blocks=self.blocks)771        return self.__examples772    def freeze(self):773        if self.frozen:774            assert isinstance(self.buffer, bytes)775            return776        self.finish_time = benchmark_time()777        assert len(self.buffer) == self.index778        # Always finish by closing all remaining examples so that we have a779        # valid tree.780        while self.depth >= 0:781            self.stop_example()782        self.__example_record.freeze()783        self.frozen = True784        self.buffer = bytes(self.buffer)785        self.events = frozenset(self.events)786        self.observer.conclude_test(self.status, self.interesting_origin)787    def draw_bits(self, n, forced=None):788        """Return an ``n``-bit integer from the underlying source of789        bytes. If ``forced`` is set to an integer will instead790        ignore the underlying source and simulate a draw as if it had791        returned that integer."""792        self.__assert_not_frozen("draw_bits")793        if n == 0:794            return 0795        assert n > 0796        n_bytes = bits_to_bytes(n)797        self.__check_capacity(n_bytes)798        if forced is not None:799            buf = int_to_bytes(forced, n_bytes)800        elif self.__bytes_drawn < len(self.__prefix):801            index = self.__bytes_drawn802            buf = self.__prefix[index : index + n_bytes]803            if len(buf) < n_bytes:804                buf += uniform(self.__random, n_bytes - len(buf))805        else:806            buf = uniform(self.__random, n_bytes)807        buf = bytearray(buf)808        self.__bytes_drawn += n_bytes809        assert len(buf) == n_bytes810        # If we have a number of bits that is not a multiple of 8811        # we have to mask off the high bits.812        buf[0] &= BYTE_MASKS[n % 8]813        buf = bytes(buf)814        result = int_from_bytes(buf)815        self.observer.draw_bits(n, forced is not None, result)816        self.__example_record.draw_bits(n, forced)817        initial = self.index818        self.buffer.extend(buf)819        self.index = len(self.buffer)820        if forced is not None:821            self.forced_indices.update(range(initial, self.index))822        self.blocks.add_endpoint(self.index)823        assert bit_length(result) <= n824        return result825    def draw_bytes(self, n):826        """Draw n bytes from the underlying source."""827        return int_to_bytes(self.draw_bits(8 * n), n)828    def write(self, string):829        """Write ``string`` to the output buffer."""830        self.__assert_not_frozen("write")831        string = bytes(string)832        if not string:833            return834        self.draw_bits(len(string) * 8, forced=int_from_bytes(string))835        return self.buffer[-len(string) :]836    def __check_capacity(self, n):837        if self.index + n > self.max_length:838            self.mark_overrun()839    def conclude_test(self, status, interesting_origin=None):840        assert (interesting_origin is None) or (status == Status.INTERESTING)841        self.__assert_not_frozen("conclude_test")842        self.interesting_origin = interesting_origin843        self.status = status844        self.freeze()845        raise StopTest(self.testcounter)846    def mark_interesting(self, interesting_origin=None):847        self.conclude_test(Status.INTERESTING, interesting_origin)848    def mark_invalid(self):849        self.conclude_test(Status.INVALID)850    def mark_overrun(self):851        self.conclude_test(Status.OVERRUN)852def bits_to_bytes(n):853    """The number of bytes required to represent an n-bit number.854    Equivalent to (n + 7) // 8, but slightly faster. This really is855    called enough times that that matters."""...test_data_tree.py
Source:test_data_tree.py  
...146def test_can_go_from_interesting_to_valid():147    tree = DataTree()148    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())149    with pytest.raises(StopTest):150        data.conclude_test(Status.INTERESTING)151    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())152    with pytest.raises(StopTest):153        data.conclude_test(Status.VALID)154def test_going_from_interesting_to_invalid_is_flaky():155    tree = DataTree()156    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())157    with pytest.raises(StopTest):158        data.conclude_test(Status.INTERESTING)159    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())160    with pytest.raises(Flaky):161        data.conclude_test(Status.INVALID)162def test_concluding_at_prefix_is_flaky():163    tree = DataTree()164    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())165    data.draw_bits(1)166    with pytest.raises(StopTest):167        data.conclude_test(Status.INTERESTING)168    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())169    with pytest.raises(Flaky):170        data.conclude_test(Status.INVALID)171def test_concluding_with_overrun_at_prefix_is_not_flaky():172    tree = DataTree()173    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())174    data.draw_bits(1)175    with pytest.raises(StopTest):176        data.conclude_test(Status.INTERESTING)177    data = ConjectureData.for_buffer(b"", observer=tree.new_observer())178    with pytest.raises(StopTest):179        data.conclude_test(Status.OVERRUN)180def test_changing_n_bits_is_flaky_in_prefix():181    tree = DataTree()182    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())183    data.draw_bits(1)184    with pytest.raises(StopTest):185        data.conclude_test(Status.INTERESTING)186    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())187    with pytest.raises(Flaky):188        data.draw_bits(2)189def test_changing_n_bits_is_flaky_in_branch():190    tree = DataTree()191    for i in [0, 1]:192        data = ConjectureData.for_buffer([i], observer=tree.new_observer())193        data.draw_bits(1)194        with pytest.raises(StopTest):195            data.conclude_test(Status.INTERESTING)196    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())197    with pytest.raises(Flaky):198        data.draw_bits(2)199def test_extending_past_conclusion_is_flaky():200    tree = DataTree()201    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())202    data.draw_bits(1)203    with pytest.raises(StopTest):204        data.conclude_test(Status.INTERESTING)205    data = ConjectureData.for_buffer(b"\1\0", observer=tree.new_observer())206    data.draw_bits(1)207    with pytest.raises(Flaky):208        data.draw_bits(1)209def test_changing_to_forced_is_flaky():210    tree = DataTree()211    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())212    data.draw_bits(1)213    with pytest.raises(StopTest):214        data.conclude_test(Status.INTERESTING)215    data = ConjectureData.for_buffer(b"\1\0", observer=tree.new_observer())216    with pytest.raises(Flaky):217        data.draw_bits(1, forced=0)218def test_changing_value_of_forced_is_flaky():219    tree = DataTree()220    data = ConjectureData.for_buffer(b"\1", observer=tree.new_observer())221    data.draw_bits(1, forced=1)222    with pytest.raises(StopTest):223        data.conclude_test(Status.INTERESTING)224    data = ConjectureData.for_buffer(b"\1\0", observer=tree.new_observer())225    with pytest.raises(Flaky):226        data.draw_bits(1, forced=0)227def test_does_not_truncate_if_unseen():228    tree = DataTree()229    b = bytes([1, 2, 3, 4])230    assert tree.rewrite(b) == (b, None)231def test_truncates_if_seen():232    tree = DataTree()233    b = bytes([1, 2, 3, 4])234    data = ConjectureData.for_buffer(b, observer=tree.new_observer())235    data.draw_bits(8)236    data.draw_bits(8)237    data.freeze()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
