Best Python code snippet using hypothesis
engine.py
Source:engine.py  
...493            self.statistics["targets"] = dict(self.best_observed_targets)494        self.debug("exit_with(%s)" % (reason.name,))495        self.exit_reason = reason496        raise RunIsComplete()497    def should_generate_more(self):498        # End the generation phase where we would have ended it if no bugs had499        # been found.  This reproduces the exit logic in `self.test_function`,500        # but with the important distinction that this clause will move on to501        # the shrinking phase having found one or more bugs, while the other502        # will exit having found zero bugs.503        if self.valid_examples >= self.settings.max_examples or self.call_count >= max(504            self.settings.max_examples * 10, 1000505        ):  # pragma: no cover506            return False507        # If we haven't found a bug, keep looking - if we hit any limits on508        # the number of tests to run that will raise an exception and stop509        # the run.510        if not self.interesting_examples:511            return True512        # If we've found a bug and won't report more than one, stop looking.513        elif not self.settings.report_multiple_bugs:514            return False515        assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count516        # Otherwise, keep searching for between ten and 'a heuristic' calls.517        # We cap 'calls after first bug' so errors are reported reasonably518        # soon even for tests that are allowed to run for a very long time,519        # or sooner if the latest half of our test effort has been fruitless.520        return self.call_count < MIN_TEST_CALLS or self.call_count < min(521            self.first_bug_found_at + 1000, self.last_bug_found_at * 2522        )523    def generate_new_examples(self):524        if Phase.generate not in self.settings.phases:525            return526        if self.interesting_examples:527            # The example database has failing examples from a previous run,528            # so we'd rather report that they're still failing ASAP than take529            # the time to look for additional failures.530            return531        self.debug("Generating new examples")532        assert self.should_generate_more()533        zero_data = self.cached_test_function(bytes(BUFFER_SIZE))534        if zero_data.status > Status.OVERRUN:535            self.__data_cache.pin(zero_data.buffer)536        if zero_data.status == Status.OVERRUN or (537            zero_data.status == Status.VALID and len(zero_data.buffer) * 2 > BUFFER_SIZE538        ):539            fail_health_check(540                self.settings,541                "The smallest natural example for your test is extremely "542                "large. This makes it difficult for Hypothesis to generate "543                "good examples, especially when trying to reduce failing ones "544                "at the end. Consider reducing the size of your data if it is "545                "of a fixed size. You could also fix this by improving how "546                "your data shrinks (see https://hypothesis.readthedocs.io/en/"547                "latest/data.html#shrinking for details), or by introducing "548                "default values inside your strategy. e.g. could you replace "549                "some arguments with their defaults by using "550                "one_of(none(), some_complex_strategy)?",551                HealthCheck.large_base_example,552            )553        self.health_check_state = HealthCheckState()554        # We attempt to use the size of the minimal generated test case starting555        # from a given novel prefix as a guideline to generate smaller test556        # cases for an initial period, by restriscting ourselves to test cases557        # that are not much larger than it.558        #559        # Calculating the actual minimal generated test case is hard, so we560        # take a best guess that zero extending a prefix produces the minimal561        # test case starting with that prefix (this is true for our built in562        # strategies). This is only a reasonable thing to do if the resulting563        # test case is valid. If we regularly run into situations where it is564        # not valid then this strategy is a waste of time, so we want to565        # abandon it early. In order to do this we track how many times in a566        # row it has failed to work, and abort small test case generation when567        # it has failed too many times in a row.568        consecutive_zero_extend_is_invalid = 0569        # We control growth during initial example generation, for two570        # reasons:571        #572        # * It gives us an opportunity to find small examples early, which573        #   gives us a fast path for easy to find bugs.574        # * It avoids low probability events where we might end up575        #   generating very large examples during health checks, which576        #   on slower machines can trigger HealthCheck.too_slow.577        #578        # The heuristic we use is that we attempt to estimate the smallest579        # extension of this prefix, and limit the size to no more than580        # an order of magnitude larger than that. If we fail to estimate581        # the size accurately, we skip over this prefix and try again.582        #583        # We need to tune the example size based on the initial prefix,584        # because any fixed size might be too small, and any size based585        # on the strategy in general can fall afoul of strategies that586        # have very different sizes for different prefixes.587        small_example_cap = clamp(10, self.settings.max_examples // 10, 50)588        optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1)589        ran_optimisations = False590        while self.should_generate_more():591            prefix = self.generate_novel_prefix()592            assert len(prefix) <= BUFFER_SIZE593            if (594                self.valid_examples <= small_example_cap595                and self.call_count <= 5 * small_example_cap596                and not self.interesting_examples597                and consecutive_zero_extend_is_invalid < 5598            ):599                minimal_example = self.cached_test_function(600                    prefix + bytes(BUFFER_SIZE - len(prefix))601                )602                if minimal_example.status < Status.VALID:603                    consecutive_zero_extend_is_invalid += 1604                    continue605                consecutive_zero_extend_is_invalid = 0606                minimal_extension = len(minimal_example.buffer) - len(prefix)607                max_length = min(len(prefix) + minimal_extension * 10, BUFFER_SIZE)608                # We could end up in a situation where even though the prefix was609                # novel when we generated it, because we've now tried zero extending610                # it not all possible continuations of it will be novel. In order to611                # avoid making redundant test calls, we rerun it in simulation mode612                # first. If this has a predictable result, then we don't bother613                # running the test function for real here. If however we encounter614                # some novel behaviour, we try again with the real test function,615                # starting from the new novel prefix that has discovered.616                try:617                    trial_data = self.new_conjecture_data(618                        prefix=prefix, max_length=max_length619                    )620                    self.tree.simulate_test_function(trial_data)621                    continue622                except PreviouslyUnseenBehaviour:623                    pass624                # If the simulation entered part of the tree that has been killed,625                # we don't want to run this.626                if trial_data.observer.killed:627                    continue628                # We might have hit the cap on number of examples we should629                # run when calculating the minimal example.630                if not self.should_generate_more():631                    break632                prefix = trial_data.buffer633            else:634                max_length = BUFFER_SIZE635            data = self.new_conjecture_data(prefix=prefix, max_length=max_length)636            self.test_function(data)637            self.generate_mutations_from(data)638            # Although the optimisations are logically a distinct phase, we639            # actually normally run them as part of example generation. The640            # reason for this is that we cannot guarantee that optimisation641            # actually exhausts our budget: It might finish running and we642            # discover that actually we still could run a bunch more test cases643            # if we want.644            if (645                self.valid_examples >= max(small_example_cap, optimise_at)646                and not ran_optimisations647            ):648                ran_optimisations = True649                self.optimise_targets()650    def generate_mutations_from(self, data):651        # A thing that is often useful but rarely happens by accident is652        # to generate the same value at multiple different points in the653        # test case.654        #655        # Rather than make this the responsibility of individual strategies656        # we implement a small mutator that just takes parts of the test657        # case with the same label and tries replacing one of them with a658        # copy of the other and tries running it. If we've made a good659        # guess about what to put where, this will run a similar generated660        # test case with more duplication.661        if (662            # An OVERRUN doesn't have enough information about the test663            # case to mutate, so we just skip those.664            data.status >= Status.INVALID665            # This has a tendency to trigger some weird edge cases during666            # generation so we don't let it run until we're done with the667            # health checks.668            and self.health_check_state is None669        ):670            initial_calls = self.call_count671            failed_mutations = 0672            while (673                self.should_generate_more()674                # We implement fairly conservative checks for how long we675                # we should run mutation for, as it's generally not obvious676                # how helpful it is for any given test case.677                and self.call_count <= initial_calls + 5678                and failed_mutations <= 5679            ):680                groups = data.examples.mutator_groups681                if not groups:682                    break683                group = self.random.choice(groups)684                ex1, ex2 = [685                    data.examples[i] for i in sorted(self.random.sample(group, 2))686                ]687                assert ex1.end <= ex2.start...erc721_pbt.py
Source:erc721_pbt.py  
1import os2import brownie3from brownie.test import strategy4from brownie.exceptions import VirtualMachineError5from hypothesis.strategies import sampled_from6class Options:7    VERIFY_EVENTS = os.getenv("PBT_VERIFY_EVENTS", "no") == "yes"8    VERIFY_RETURN_VALUES = os.getenv("PBT_VERIFY_RETURN_VALUES", "no") == "yes"9    DEBUG = os.getenv("PBT_DEBUG", "no") == "yes"10    STATEFUL_STEP_COUNT = int(os.getenv("PBT_STATEFUL_STEP_COUNT", 10))11    MAX_EXAMPLES = int(os.getenv("PBT_MAX_EXAMPLES", 100))12    SEED = int(os.getenv("PBT_SEED", 0))13    ACCOUNTS = int(os.getenv("PBT_ACCOUNTS", 4))14    TOKENS = int(os.getenv("PBT_TOKENS", 6))15class StateMachine:16    st_owner = strategy("uint256", min_value=0, max_value=Options.ACCOUNTS - 1)17    st_sender = strategy("uint256", min_value=0, max_value=Options.ACCOUNTS - 1)18    st_receiver = strategy(19        "uint256", min_value=0, max_value=Options.ACCOUNTS - 120    )21    st_token = strategy("uint256", min_value=1, max_value=Options.TOKENS)22    st_bool = strategy("bool" )23    def __init__(self, wallets, contract, DEBUG=None):24        self.wallets = wallets25        self.addr2idx = { addr: i for i,addr in enumerate(wallets)}26        self.addr2idx[0] = -127        self.addr2idx[brownie.convert.to_address('0x0000000000000000000000000000000000000000')] = -128        self.tokens = range(1, Options.TOKENS + 1)29        self.contract = contract30    def setup(self):31        # tokenId -> owner address - must match contract's ownerOf(tokenId)32        self.owner = {tokenId: self.contract.address for tokenId in self.tokens}33        # address -> number of tokens - must match contract's balanceOf(address)34        self.balance = {addr: 0 for addr in range(Options.ACCOUNTS)}35        # tokenId -> approved address - must match contract's getApproved(address)36        self.approved = {tokenId: -1 for tokenId in self.tokens}37        # address -> list of approved operators - for each address x in operators[address]38        # isApprovedForAll(address,x) must return true39        self.operators = {addr: set() for addr in range(Options.ACCOUNTS)}40        # Callback for initial setup (contract-dependent)41        self.onSetup()42        if Options.DEBUG:43            print("setup()")44            self.dumpState()45    def teardown(self):46        if Options.DEBUG:47            print("teardown()")48            self.dumpState()49    def canTransfer(self, sender, owner, tokenId):50        return owner == self.owner[tokenId] and (51            sender == owner52            or sender == self.approved[tokenId]53            or sender in self.operators[owner]54        )55    def rule_transferFrom(self, st_owner, st_receiver, st_token, st_sender):56        if Options.DEBUG:57            print(58                "transferFrom(owner {},receiver {},token {} [sender: {}])".format(59                    st_owner, st_receiver, st_token, st_sender60                )61            )62        if self.canTransfer(st_sender, st_owner, st_token):63            tx = self.contract.transferFrom(64                self.wallets[st_owner],65                self.wallets[st_receiver],66                st_token,67                {"from": self.wallets[st_sender]},68            )69            self.owner[st_token] = st_receiver70            self.approved[st_token] = -171            self.balance[st_owner] = self.balance[st_owner] - 172            self.balance[st_receiver] = self.balance[st_receiver] + 173            self.verifyOwner(st_token)74            self.verifyApproved(st_token)75            self.verifyBalance(st_owner)76            self.verifyBalance(st_receiver)77            self.verifyEvent(78                tx,79                "Transfer",80                {81                    "_from": self.wallets[st_owner],82                    "_to": self.wallets[st_receiver],83                    "_tokenId": st_token84                },85            )86        else:87            with brownie.reverts():88                self.contract.transferFrom(89                    self.wallets[st_owner],90                    self.wallets[st_receiver],91                    st_token,92                    {"from": self.wallets[st_sender]},93                )94    def rule_safeTransferFrom(self, st_owner, st_receiver, st_token, st_sender):95        if Options.DEBUG:96            print(97                "safeTransferFrom({},{},{} [sender: {}])".format(98                    st_owner, st_receiver, st_token, st_sender99                )100            )101        if self.canTransfer(st_sender, st_owner, st_token):102            tx = self.contract.safeTransferFrom(103                self.wallets[st_owner],104                self.wallets[st_receiver],105                st_token,106                {"from": self.wallets[st_sender]},107            )108            self.owner[st_token] = st_receiver109            self.approved[st_token] = -1110            self.balance[st_owner] = self.balance[st_owner] - 1111            self.balance[st_receiver] = self.balance[st_receiver] + 1112            self.verifyOwner(st_token)113            self.verifyApproved(st_token)114            self.verifyBalance(st_owner)115            self.verifyBalance(st_receiver)116            self.verifyEvent(117                tx,118                "Transfer",119                {120                    "_from": self.wallets[st_owner],121                    "_to": self.wallets[st_receiver],122                    "_tokenId": st_token123                },124            )125        else:126            with brownie.reverts():127                self.contract.safeTransferFrom(128                    self.wallets[st_owner],129                    self.wallets[st_receiver],130                    st_token,131                    {"from": self.wallets[st_sender]},132                )133    def rule_approve(self, st_sender, st_token, st_receiver):134        if Options.DEBUG:135            print(136                "approve({},{}) [sender: {}])".format(137                    st_receiver, st_token, st_sender138                )139            )140        if st_receiver != self.owner[st_token] and \141           (self.owner[st_token] == st_sender 142           or st_sender in self.operators[self.owner[st_token]]):143            with normal():144                tx = self.contract.approve(145                    self.wallets[st_receiver], st_token,{"from": self.wallets[st_sender]}146                )147                self.approved[st_token] = st_receiver148                self.verifyApproved(st_token)149                self.verifyEvent(150                    tx,151                    "Approval",152                    {153                        "_owner": self.wallets[self.owner[st_token]], 154                        "_tokenId": st_token, 155                        "_approved": self.wallets[st_receiver]156                    }157                )158        else:159            with brownie.reverts():160                self.contract.approve(161                    self.wallets[st_receiver], st_token,{"from": self.wallets[st_sender]}162                )163    def rule_setApprovalForAll(self, st_sender, st_receiver, st_bool):164        if Options.DEBUG:165            print(166                "setApprovedForAll({}, {}) [sender: {}])".format(167                    st_receiver, st_bool, st_sender168                )169            )170        if st_receiver != st_sender:171            with normal():172                tx = self.contract.setApprovalForAll(173                        self.wallets[st_receiver], 174                        st_bool, {"from":self.wallets[st_sender]})175                if st_bool:176                   self.operators[st_sender].add(st_receiver)177                elif st_receiver in self.operators[st_sender]:178                   self.operators[st_sender].remove(st_receiver)179                self.verifySetApprovedForAll(st_sender, st_receiver)180                self.verifyEvent(181                    tx,182                    "ApprovalForAll",183                    {184                        "_owner": self.wallets[st_sender], 185                        "_operator": self.wallets[st_receiver], 186                        "_approved": st_bool187                    }188                )189        else:190            with brownie.reverts():191                tx = self.contract.setApprovalForAll(192                        self.wallets[st_receiver], 193                        st_bool, {"from":self.wallets[st_sender]})194    def verifyOwner(self, tokenId):195        self.verifyValue(196            "ownerOf({})".format(tokenId),197            self.owner[tokenId],198            self.addr2idx[self.contract.ownerOf(tokenId)]199        )200    def verifyBalance(self, wIdx):201        self.verifyValue(202            "balanceOf({})".format(wIdx),203            self.balance[wIdx],204            self.contract.balanceOf(self.wallets[wIdx]),205        )206    def verifyApproved(self, token):207        self.verifyValue(208            "getApproved({})".format(token),209            self.approved[token],210            self.addr2idx[self.contract.getApproved(token)]211        )212    def verifySetApprovedForAll(self, sender, receiver):213        self.verifyValue(214            "isApprovedForAll({}, {})".format(sender, receiver),215            receiver in self.operators[sender],216            self.contract.isApprovedForAll(self.wallets[sender], self.wallets[receiver])217        )218    def verifyEvent(self, tx, eventName, data):219        if Options.VERIFY_EVENTS:220            if not eventName in tx.events:221                raise AssertionError(222                    "{}: event was not fired".format(eventName)223                )224            ev = tx.events[eventName]225            for k in data:226                if not k in ev:227                    raise AssertionError(228                        "{}.{}: absent event data".format(eventName, k)229                    )230                self.verifyValue("{}.{}".format(eventName, k), data[k], ev[k])231    def verifyReturnValue(self, tx, expected):232        if Options.VERIFY_RETURN_VALUES:233            self.verifyValue("return value", expected, tx.return_value)234    def verifyValue(self, msg, expected, actual):235        if expected != actual:236            self.value_failure = True237            raise AssertionError(238                "{} : expected value {}, actual value was {}".format(239                    msg, expected, actual240                )241            )242    def dumpMap(self, desc, map):243        print(desc + " {")244        for k in map:245            print("{} ----> {}".format(k, map[k]))246        print("}")247    def dumpState(self):248        print("= STATE =")249        self.dumpMap("owner", self.owner)250        self.dumpMap("balance", self.balance)251        self.dumpMap("approved", self.approved)252        self.dumpMap("operators", self.operators)253def patch_hypothesis_for_seed_handling(seed):254    import hypothesis255    h_run_state_machine = hypothesis.stateful.run_state_machine_as_test256    def run_state_machine(state_machine_factory, settings=None):257        state_machine_factory._hypothesis_internal_use_seed = seed258        h_run_state_machine(state_machine_factory, settings)259    hypothesis.stateful.run_state_machine_as_test = run_state_machine260def patch_hypothesis_for_fuzz_behavior():261    import hypothesis262    # Replacement for method should_generate_more in hypothesis.internal.conjecture.engine.ConjectureRunner263    def should_generate_more_patch(self):264        # End the generation phase where we would have ended it if no bugs had265        # been found.  This reproduces the exit logic in `self.test_function`,266        # but with the important distinction that this clause will move on to267        # the shrinking phase having found one or more bugs, while the other268        # will exit having found zero bugs.269        if self.valid_examples >= self.settings.max_examples or self.call_count >= max(270            self.settings.max_examples * 10, 1000271        ):  # pragma: no cover272            return False273        # If we haven't found a bug, keep looking - if we hit any limits on274        # the number of tests to run that will raise an exception and stop275        # the run.276        if not self.interesting_examples:277            return True278        # If we've found a bug and won't report more than one, stop looking.279        elif not self.settings.report_multiple_bugs:280            return False281        assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count282        # PATCH IS HERE283        return self.valid_examples <= self.settings.max_examples \284            and self.call_count <= 2 * self.settings.max_examples285    hypothesis.internal.conjecture.engine.ConjectureRunner.should_generate_more = should_generate_more_patch 286def patch_brownie_for_assertion_detection():287    from brownie.test.managers.runner import RevertContextManager288    from brownie.exceptions import VirtualMachineError289    f = RevertContextManager.__exit__290    def alt_exit(self, exc_type, exc_value, traceback):291        if exc_type is VirtualMachineError:292            exc_value.__traceback__.tb_next = None293            if exc_value.revert_type != "revert":294                return False295        return f(self, exc_type, exc_value, traceback)296    RevertContextManager.__exit__ = alt_exit297def register_hypothesis_profiles():298    import hypothesis299    from hypothesis import settings, Verbosity, Phase300    derandomize = True301    if Options.SEED != 0:302        patch_hypothesis_for_seed_handling(Options.SEED)303        derandomize = False304    patch_hypothesis_for_fuzz_behavior()305    patch_brownie_for_assertion_detection()306    settings.register_profile(307        "generate",308        stateful_step_count=Options.STATEFUL_STEP_COUNT,309        max_examples=Options.MAX_EXAMPLES,310        phases=[Phase.generate],311        report_multiple_bugs=True,312        derandomize=derandomize,313        print_blob=True,314    )315    settings.register_profile(316        "shrinking",317        stateful_step_count=Options.STATEFUL_STEP_COUNT,318        max_examples=Options.MAX_EXAMPLES,319        phases=[Phase.generate, Phase.shrink],320        report_multiple_bugs=True,321        derandomize=derandomize,322        print_blob=True,323    )324class NoRevertContextManager:325    def __init__(self):326        pass327    def __enter__(self):328        pass329    def __exit__(self, exc_type, exc_value, traceback):330        if exc_type is None:331            return True332        import traceback333        if exc_type is VirtualMachineError:334            exc_value.__traceback__.tb_next = None335        elif exc_type is AssertionError:336            exc_value.__traceback__.tb_next = None337        return False338def normal():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
