Best Python code snippet using hypothesis
engine.py
Source:engine.py  
...117            unicode_safe_repr(list(data.buffer[:data.index])),118            unicode_safe_repr(data.status),119            data.output,120        ))121    def incorporate_new_buffer(self, buffer):122        if buffer in self.seen:123            return False124        assert self.last_data.status == Status.INTERESTING125        if (126            self.settings.timeout > 0 and127            time.time() >= self.start_time + self.settings.timeout128        ):129            raise RunIsComplete()130        self.examples_considered += 1131        buffer = buffer[:self.last_data.index]132        if sort_key(buffer) >= sort_key(self.last_data.buffer):133            return False134        assert sort_key(buffer) <= sort_key(self.last_data.buffer)135        data = TestData.for_buffer(buffer)136        self.test_function(data)137        data.freeze()138        self.note_for_corpus(data)139        if self.consider_new_test_data(data):140            self.shrinks += 1141            self.last_data = data142            if self.shrinks >= self.settings.max_shrinks:143                raise RunIsComplete()144            self.last_data = data145            self.changed += 1146            return True147        return False148    def run(self):149        with self.settings:150            try:151                self._run()152            except RunIsComplete:153                pass154            self.debug(155                u'Run complete after %d examples (%d valid) and %d shrinks' % (156                    self.iterations, self.valid_examples, self.shrinks,157                ))158    def _new_mutator(self):159        def draw_new(data, n, distribution):160            return distribution(self.random, n)161        def draw_existing(data, n, distribution):162            return self.last_data.buffer[data.index:data.index + n]163        def draw_smaller(data, n, distribution):164            existing = self.last_data.buffer[data.index:data.index + n]165            r = distribution(self.random, n)166            if r <= existing:167                return r168            return _draw_predecessor(self.random, existing)169        def draw_larger(data, n, distribution):170            existing = self.last_data.buffer[data.index:data.index + n]171            r = distribution(self.random, n)172            if r >= existing:173                return r174            return _draw_successor(self.random, existing)175        def reuse_existing(data, n, distribution):176            choices = data.block_starts.get(n, []) or \177                self.last_data.block_starts.get(n, [])178            if choices:179                i = self.random.choice(choices)180                return self.last_data.buffer[i:i + n]181            else:182                return distribution(self.random, n)183        def flip_bit(data, n, distribution):184            buf = bytearray(185                self.last_data.buffer[data.index:data.index + n])186            i = self.random.randint(0, n - 1)187            k = self.random.randint(0, 7)188            buf[i] ^= (1 << k)189            return hbytes(buf)190        def draw_zero(data, n, distribution):191            return b'\0' * n192        def draw_constant(data, n, distribution):193            return bytes_from_list([194                self.random.randint(0, 255)195            ] * n)196        options = [197            draw_new,198            reuse_existing, reuse_existing,199            draw_existing, draw_smaller, draw_larger,200            flip_bit, draw_zero, draw_constant,201        ]202        bits = [203            self.random.choice(options) for _ in hrange(3)204        ]205        def draw_mutated(data, n, distribution):206            if (207                data.index + n > len(self.last_data.buffer)208            ):209                return distribution(self.random, n)210            return self.random.choice(bits)(data, n, distribution)211        return draw_mutated212    def _run(self):213        self.last_data = None214        mutations = 0215        start_time = time.time()216        if (217            self.settings.database is not None and218            self.database_key is not None219        ):220            corpus = sorted(221                self.settings.database.fetch(self.database_key),222                key=lambda d: (len(d), d)223            )224            for existing in corpus:225                if self.valid_examples >= self.settings.max_examples:226                    return227                if self.iterations >= max(228                    self.settings.max_iterations, self.settings.max_examples229                ):230                    return231                data = TestData.for_buffer(existing)232                self.test_function(data)233                data.freeze()234                self.last_data = data235                if data.status < Status.VALID:236                    self.settings.database.delete(237                        self.database_key, existing)238                elif data.status == Status.VALID:239                    # Incremental garbage collection! we store a lot of240                    # examples in the DB as we shrink: Those that stay241                    # interesting get kept, those that become invalid get242                    # dropped, but those that are merely valid gradually go243                    # away over time.244                    if self.random.randint(0, 2) == 0:245                        self.settings.database.delete(246                            self.database_key, existing)247                else:248                    assert data.status == Status.INTERESTING249                    self.last_data = data250                    break251        if Phase.generate in self.settings.phases:252            if (253                self.last_data is None or254                self.last_data.status < Status.INTERESTING255            ):256                self.new_buffer()257            mutator = self._new_mutator()258            while self.last_data.status != Status.INTERESTING:259                if self.valid_examples >= self.settings.max_examples:260                    return261                if self.iterations >= max(262                    self.settings.max_iterations, self.settings.max_examples263                ):264                    return265                if (266                    self.settings.timeout > 0 and267                    time.time() >= start_time + self.settings.timeout268                ):269                    return270                if mutations >= self.settings.max_mutations:271                    mutations = 0272                    self.new_buffer()273                    mutator = self._new_mutator()274                else:275                    data = TestData(276                        draw_bytes=mutator,277                        max_length=self.settings.buffer_size278                    )279                    self.test_function(data)280                    data.freeze()281                    self.note_for_corpus(data)282                    prev_data = self.last_data283                    if self.consider_new_test_data(data):284                        self.last_data = data285                        if data.status > prev_data.status:286                            mutations = 0287                    else:288                        mutator = self._new_mutator()289                mutations += 1290        data = self.last_data291        if data is None:292            return293        assert isinstance(data.output, text_type)294        if self.settings.max_shrinks <= 0:295            return296        if Phase.shrink not in self.settings.phases:297            return298        if not self.last_data.buffer:299            return300        data = TestData.for_buffer(self.last_data.buffer)301        self.test_function(data)302        if data.status != Status.INTERESTING:303            return304        change_counter = -1305        while self.changed > change_counter:306            change_counter = self.changed307            failed_deletes = 0308            while self.last_data.intervals and failed_deletes < 10:309                if self.random.randint(0, 1):310                    u, v = self.random.choice(self.last_data.intervals)311                else:312                    n = len(self.last_data.buffer) - 1313                    u, v = sorted((314                        self.random.choice(self.last_data.intervals)315                    ))316                if (317                    v < len(self.last_data.buffer)318                ) and self.incorporate_new_buffer(319                    self.last_data.buffer[:u] +320                    self.last_data.buffer[v:]321                ):322                    failed_deletes = 0323                else:324                    failed_deletes += 1325            i = 0326            while i < len(self.last_data.intervals):327                u, v = self.last_data.intervals[i]328                if not self.incorporate_new_buffer(329                    self.last_data.buffer[:u] +330                    self.last_data.buffer[v:]331                ):332                    i += 1333            i = 0334            while i + 1 < len(self.last_data.buffer):335                if not self.incorporate_new_buffer(336                    self.last_data.buffer[:i] +337                    self.last_data.buffer[i + 1:]338                ):339                    i += 1340            i = 0341            while i < len(self.last_data.blocks):342                u, v = self.last_data.blocks[i]343                buf = self.last_data.buffer344                block = buf[u:v]345                n = v - u346                all_blocks = sorted(set([bytes(n)] + [347                    buf[a:a + n]348                    for a in self.last_data.block_starts[n]349                ]))350                better_blocks = all_blocks[:all_blocks.index(block)]351                for b in better_blocks:352                    if self.incorporate_new_buffer(353                        buf[:u] + b + buf[v:]354                    ):355                        break356                i += 1357            block_counter = -1358            while block_counter < self.changed:359                block_counter = self.changed360                blocks = [361                    k for k, count in362                    Counter(363                        self.last_data.buffer[u:v]364                        for u, v in self.last_data.blocks).items()365                    if count > 1366                ]367                for block in blocks:368                    parts = [369                        self.last_data.buffer[r:s]370                        for r, s in self.last_data.blocks371                    ]372                    def replace(b):373                        return b''.join(374                            bytes(b if c == block else c) for c in parts375                        )376                    minimize(377                        block,378                        lambda b: self.incorporate_new_buffer(replace(b)),379                        self.random380                    )381            i = 0382            while i < len(self.last_data.blocks):383                u, v = self.last_data.blocks[i]384                minimize(385                    self.last_data.buffer[u:v],386                    lambda b: self.incorporate_new_buffer(387                        self.last_data.buffer[:u] + b +388                        self.last_data.buffer[v:],389                    ), self.random390                )391                i += 1392            i = 0393            alternatives = None394            while i < len(self.last_data.intervals):395                if alternatives is None:396                    alternatives = sorted(set(397                        self.last_data.buffer[u:v]398                        for u, v in self.last_data.intervals), key=len)399                u, v = self.last_data.intervals[i]400                for a in alternatives:401                    buf = self.last_data.buffer402                    if (403                        len(a) < v - u or404                        (len(a) == (v - u) and a < buf[u:v])405                    ):406                        if self.incorporate_new_buffer(buf[:u] + a + buf[v:]):407                            alternatives = None408                            break409                i += 1410def _draw_predecessor(rnd, xs):411    r = bytearray()412    any_strict = False413    for x in to_bytes_sequence(xs):414        if not any_strict:415            c = rnd.randint(0, x)416            if c < x:417                any_strict = True418        else:419            c = rnd.randint(0, 255)420        r.append(c)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
