How to use __changed_blocks method in hypothesis

Best Python code snippet using hypothesis

engine.py

Source:engine.py Github

copy

Full Screen

1# coding=utf-82#3# This file is part of Hypothesis, which may be found at4# https://github.com/HypothesisWorks/hypothesis-python5#6# Most of this work is copyright (C) 2013-2018 David R. MacIver7# (david@drmaciver.com), but it contains contributions by others. See8# CONTRIBUTING.rst for a full list of people who may hold copyright, and9# consult the git log if you need to determine who owns an individual10# contribution.11#12# This Source Code Form is subject to the terms of the Mozilla Public License,13# v. 2.0. If a copy of the MPL was not distributed with this file, You can14# obtain one at http://mozilla.org/MPL/2.0/.15#16# END HEADER17from __future__ import division, print_function, absolute_import18import heapq19from enum import Enum20from random import Random, getrandbits21from weakref import WeakKeyDictionary22from functools import total_ordering23from collections import defaultdict24import attr25from hypothesis import Phase, Verbosity, HealthCheck26from hypothesis import settings as Settings27from hypothesis._settings import local_settings, note_deprecation28from hypothesis.reporting import debug_report29from hypothesis.internal.compat import Counter, ceil, hbytes, hrange, \30 int_to_bytes, benchmark_time, int_from_bytes, to_bytes_sequence31from hypothesis.utils.conventions import UniqueIdentifier32from hypothesis.internal.healthcheck import fail_health_check33from hypothesis.internal.conjecture.data import MAX_DEPTH, Status, \34 StopTest, ConjectureData35from hypothesis.internal.conjecture.shrinking import Length, Integer, \36 Lexical, Ordering37# Tell pytest to omit the body of this module from tracebacks38# http://doc.pytest.org/en/latest/example/simple.html#writing-well-integrated-assertion-helpers39__tracebackhide__ = True40HUNG_TEST_TIME_LIMIT = 5 * 6041MAX_SHRINKS = 50042@attr.s43class HealthCheckState(object):44 valid_examples = attr.ib(default=0)45 invalid_examples = attr.ib(default=0)46 overrun_examples = attr.ib(default=0)47 draw_times = attr.ib(default=attr.Factory(list))48class ExitReason(Enum):49 max_examples = 050 max_iterations = 151 timeout = 252 max_shrinks = 353 finished = 454 flaky = 555class RunIsComplete(Exception):56 pass57class ConjectureRunner(object):58 def __init__(59 self, test_function, settings=None, random=None,60 database_key=None,61 ):62 self._test_function = test_function63 self.settings = settings or Settings()64 self.shrinks = 065 self.call_count = 066 self.event_call_counts = Counter()67 self.valid_examples = 068 self.start_time = benchmark_time()69 self.random = random or Random(getrandbits(128))70 self.database_key = database_key71 self.status_runtimes = {}72 self.all_drawtimes = []73 self.all_runtimes = []74 self.events_to_strings = WeakKeyDictionary()75 self.target_selector = TargetSelector(self.random)76 # Tree nodes are stored in an array to prevent heavy nesting of data77 # structures. Branches are dicts mapping bytes to child nodes (which78 # will in general only be partially populated). Leaves are79 # ConjectureData objects that have been previously seen as the result80 # of following that path.81 self.tree = [{}]82 # A node is dead if there is nothing left to explore past that point.83 # Recursively, a node is dead if either it is a leaf or every byte84 # leads to a dead node when starting from here.85 self.dead = set()86 # We rewrite the byte stream at various points during parsing, to one87 # that will produce an equivalent result but is in some sense more88 # canonical. We keep track of these so that when walking the tree we89 # can identify nodes where the exact byte value doesn't matter and90 # treat all bytes there as equivalent. This significantly reduces the91 # size of the search space and removes a lot of redundant examples.92 # Maps tree indices where to the unique byte that is valid at that93 # point. Corresponds to data.write() calls.94 self.forced = {}95 # Maps tree indices to a mask that restricts bytes at that point.96 # Currently this is only updated by draw_bits, but it potentially97 # could get used elsewhere.98 self.masks = {}99 # Where a tree node consists of the beginning of a block we track the100 # size of said block. This allows us to tell when an example is too101 # short even if it goes off the unexplored region of the tree - if it102 # is at the beginning of a block of size 4 but only has 3 bytes left,103 # it's going to overrun the end of the buffer regardless of the104 # buffer contents.105 self.block_sizes = {}106 self.interesting_examples = {}107 self.covering_examples = {}108 self.shrunk_examples = set()109 self.tag_intern_table = {}110 self.health_check_state = None111 self.used_examples_from_database = False112 def __tree_is_exhausted(self):113 return 0 in self.dead114 def test_function(self, data):115 if benchmark_time() - self.start_time >= HUNG_TEST_TIME_LIMIT:116 fail_health_check(self.settings, (117 'Your test has been running for at least five minutes. This '118 'is probably not what you intended, so by default Hypothesis '119 'turns it into an error.'120 ), HealthCheck.hung_test)121 self.call_count += 1122 try:123 self._test_function(data)124 data.freeze()125 except StopTest as e:126 if e.testcounter != data.testcounter:127 self.save_buffer(data.buffer)128 raise e129 except BaseException:130 self.save_buffer(data.buffer)131 raise132 finally:133 data.freeze()134 self.note_details(data)135 self.target_selector.add(data)136 self.debug_data(data)137 tags = frozenset(data.tags)138 data.tags = self.tag_intern_table.setdefault(tags, tags)139 if data.status == Status.VALID:140 self.valid_examples += 1141 for t in data.tags:142 existing = self.covering_examples.get(t)143 if (144 existing is None or145 sort_key(data.buffer) < sort_key(existing.buffer)146 ):147 self.covering_examples[t] = data148 if self.database is not None:149 self.database.save(self.covering_key, data.buffer)150 if existing is not None:151 self.database.delete(152 self.covering_key, existing.buffer)153 tree_node = self.tree[0]154 indices = []155 node_index = 0156 for i, b in enumerate(data.buffer):157 indices.append(node_index)158 if i in data.forced_indices:159 self.forced[node_index] = b160 try:161 self.masks[node_index] = data.masked_indices[i]162 except KeyError:163 pass164 try:165 node_index = tree_node[b]166 except KeyError:167 node_index = len(self.tree)168 self.tree.append({})169 tree_node[b] = node_index170 tree_node = self.tree[node_index]171 if node_index in self.dead:172 break173 for u, v in data.blocks:174 # This can happen if we hit a dead node when walking the buffer.175 # In that case we already have this section of the tree mapped.176 if u >= len(indices):177 break178 self.block_sizes[indices[u]] = v - u179 self.dead.update(indices[self.cap:])180 if data.status != Status.OVERRUN and node_index not in self.dead:181 self.dead.add(node_index)182 self.tree[node_index] = data183 for j in reversed(indices):184 mask = self.masks.get(j, 0xff)185 assert _is_simple_mask(mask)186 max_size = mask + 1187 if (188 len(self.tree[j]) < max_size and189 j not in self.forced190 ):191 break192 if set(self.tree[j].values()).issubset(self.dead):193 self.dead.add(j)194 else:195 break196 if data.status == Status.INTERESTING:197 key = data.interesting_origin198 changed = False199 try:200 existing = self.interesting_examples[key]201 except KeyError:202 changed = True203 else:204 if sort_key(data.buffer) < sort_key(existing.buffer):205 self.shrinks += 1206 self.downgrade_buffer(existing.buffer)207 changed = True208 if changed:209 self.save_buffer(data.buffer)210 self.interesting_examples[key] = data211 self.shrunk_examples.discard(key)212 if self.shrinks >= MAX_SHRINKS:213 self.exit_with(ExitReason.max_shrinks)214 if (215 self.settings.timeout > 0 and216 benchmark_time() >= self.start_time + self.settings.timeout217 ):218 note_deprecation((219 'Your tests are hitting the settings timeout (%.2fs). '220 'This functionality will go away in a future release '221 'and you should not rely on it. Instead, try setting '222 'max_examples to be some value lower than %d (the number '223 'of examples your test successfully ran here). Or, if you '224 'would prefer your tests to run to completion, regardless '225 'of how long they take, you can set the timeout value to '226 'hypothesis.unlimited.'227 ) % (228 self.settings.timeout, self.valid_examples),229 self.settings)230 self.exit_with(ExitReason.timeout)231 if not self.interesting_examples:232 if self.valid_examples >= self.settings.max_examples:233 self.exit_with(ExitReason.max_examples)234 if self.call_count >= max(235 self.settings.max_examples * 10,236 # We have a high-ish default max iterations, so that tests237 # don't become flaky when max_examples is too low.238 1000239 ):240 self.exit_with(ExitReason.max_iterations)241 if self.__tree_is_exhausted():242 self.exit_with(ExitReason.finished)243 self.record_for_health_check(data)244 def generate_novel_prefix(self):245 prefix = bytearray()246 node = 0247 while True:248 assert len(prefix) < self.cap249 assert node not in self.dead250 mask = self.masks.get(node, 0xff)251 assert _is_simple_mask(mask)252 upper_bound = mask + 1253 try:254 c = self.forced[node]255 prefix.append(c)256 node = self.tree[node][c]257 continue258 except KeyError:259 pass260 c = self.random.randrange(0, upper_bound)261 try:262 next_node = self.tree[node][c]263 if next_node in self.dead:264 choices = [265 b for b in hrange(upper_bound)266 if self.tree[node].get(b) not in self.dead267 ]268 assert choices269 c = self.random.choice(choices)270 node = self.tree[node][c]271 else:272 node = next_node273 prefix.append(c)274 except KeyError:275 prefix.append(c)276 break277 assert node not in self.dead278 return hbytes(prefix)279 @property280 def cap(self):281 return self.settings.buffer_size // 2282 def record_for_health_check(self, data):283 # Once we've actually found a bug, there's no point in trying to run284 # health checks - they'll just mask the actually important information.285 if data.status == Status.INTERESTING:286 self.health_check_state = None287 state = self.health_check_state288 if state is None:289 return290 state.draw_times.extend(data.draw_times)291 if data.status == Status.VALID:292 state.valid_examples += 1293 elif data.status == Status.INVALID:294 state.invalid_examples += 1295 else:296 assert data.status == Status.OVERRUN297 state.overrun_examples += 1298 max_valid_draws = 10299 max_invalid_draws = 50300 max_overrun_draws = 20301 assert state.valid_examples <= max_valid_draws302 if state.valid_examples == max_valid_draws:303 self.health_check_state = None304 return305 if state.overrun_examples == max_overrun_draws:306 fail_health_check(self.settings, (307 'Examples routinely exceeded the max allowable size. '308 '(%d examples overran while generating %d valid ones)'309 '. Generating examples this large will usually lead to'310 ' bad results. You could try setting max_size parameters '311 'on your collections and turning '312 'max_leaves down on recursive() calls.') % (313 state.overrun_examples, state.valid_examples314 ), HealthCheck.data_too_large)315 if state.invalid_examples == max_invalid_draws:316 fail_health_check(self.settings, (317 'It looks like your strategy is filtering out a lot '318 'of data. Health check found %d filtered examples but '319 'only %d good ones. This will make your tests much '320 'slower, and also will probably distort the data '321 'generation quite a lot. You should adapt your '322 'strategy to filter less. This can also be caused by '323 'a low max_leaves parameter in recursive() calls') % (324 state.invalid_examples, state.valid_examples325 ), HealthCheck.filter_too_much)326 draw_time = sum(state.draw_times)327 if draw_time > 1.0:328 fail_health_check(self.settings, (329 'Data generation is extremely slow: Only produced '330 '%d valid examples in %.2f seconds (%d invalid ones '331 'and %d exceeded maximum size). Try decreasing '332 "size of the data you're generating (with e.g."333 'max_size or max_leaves parameters).'334 ) % (335 state.valid_examples, draw_time, state.invalid_examples,336 state.overrun_examples), HealthCheck.too_slow,)337 def save_buffer(self, buffer):338 if self.settings.database is not None:339 key = self.database_key340 if key is None:341 return342 self.settings.database.save(key, hbytes(buffer))343 def downgrade_buffer(self, buffer):344 if (345 self.settings.database is not None and346 self.database_key is not None347 ):348 self.settings.database.move(349 self.database_key, self.secondary_key, buffer)350 @property351 def secondary_key(self):352 return b'.'.join((self.database_key, b'secondary'))353 @property354 def covering_key(self):355 return b'.'.join((self.database_key, b'coverage'))356 def note_details(self, data):357 runtime = max(data.finish_time - data.start_time, 0.0)358 self.all_runtimes.append(runtime)359 self.all_drawtimes.extend(data.draw_times)360 self.status_runtimes.setdefault(data.status, []).append(runtime)361 for event in set(map(self.event_to_string, data.events)):362 self.event_call_counts[event] += 1363 def debug(self, message):364 with local_settings(self.settings):365 debug_report(message)366 @property367 def report_debug_info(self):368 return self.settings.verbosity >= Verbosity.debug369 def debug_data(self, data):370 if not self.report_debug_info:371 return372 stack = [[]]373 def go(ex):374 if ex.length == 0:375 return376 if len(ex.children) == 0:377 stack[-1].append(int_from_bytes(378 data.buffer[ex.start:ex.end]379 ))380 else:381 node = []382 stack.append(node)383 for v in ex.children:384 go(v)385 stack.pop()386 if len(node) == 1:387 stack[-1].extend(node)388 else:389 stack[-1].append(node)390 go(data.examples[0])391 assert len(stack) == 1392 status = repr(data.status)393 if data.status == Status.INTERESTING:394 status = '%s (%r)' % (status, data.interesting_origin,)395 self.debug('%d bytes %r -> %s, %s' % (396 data.index, stack[0], status, data.output,397 ))398 def run(self):399 with local_settings(self.settings):400 try:401 self._run()402 except RunIsComplete:403 pass404 for v in self.interesting_examples.values():405 self.debug_data(v)406 self.debug(407 u'Run complete after %d examples (%d valid) and %d shrinks'408 % (self.call_count, self.valid_examples, self.shrinks))409 def _new_mutator(self):410 target_data = [None]411 def draw_new(data, n):412 return uniform(self.random, n)413 def draw_existing(data, n):414 return target_data[0].buffer[data.index:data.index + n]415 def draw_smaller(data, n):416 existing = target_data[0].buffer[data.index:data.index + n]417 r = uniform(self.random, n)418 if r <= existing:419 return r420 return _draw_predecessor(self.random, existing)421 def draw_larger(data, n):422 existing = target_data[0].buffer[data.index:data.index + n]423 r = uniform(self.random, n)424 if r >= existing:425 return r426 return _draw_successor(self.random, existing)427 def reuse_existing(data, n):428 choices = data.block_starts.get(n, [])429 if choices:430 i = self.random.choice(choices)431 return hbytes(data.buffer[i:i + n])432 else:433 result = uniform(self.random, n)434 assert isinstance(result, hbytes)435 return result436 def flip_bit(data, n):437 buf = bytearray(438 target_data[0].buffer[data.index:data.index + n])439 i = self.random.randint(0, n - 1)440 k = self.random.randint(0, 7)441 buf[i] ^= (1 << k)442 return hbytes(buf)443 def draw_zero(data, n):444 return hbytes(b'\0' * n)445 def draw_max(data, n):446 return hbytes([255]) * n447 def draw_constant(data, n):448 return hbytes([self.random.randint(0, 255)]) * n449 def redraw_last(data, n):450 u = target_data[0].blocks[-1][0]451 if data.index + n <= u:452 return target_data[0].buffer[data.index:data.index + n]453 else:454 return uniform(self.random, n)455 options = [456 draw_new,457 redraw_last, redraw_last,458 reuse_existing, reuse_existing,459 draw_existing, draw_smaller, draw_larger,460 flip_bit,461 draw_zero, draw_max, draw_zero, draw_max,462 draw_constant,463 ]464 bits = [465 self.random.choice(options) for _ in hrange(3)466 ]467 prefix = [None]468 def mutate_from(origin):469 target_data[0] = origin470 prefix[0] = self.generate_novel_prefix()471 return draw_mutated472 def draw_mutated(data, n):473 if data.index + n > len(target_data[0].buffer):474 result = uniform(self.random, n)475 else:476 result = self.random.choice(bits)(data, n)477 p = prefix[0]478 if data.index < len(p):479 start = p[data.index:data.index + n]480 result = start + result[len(start):]481 return self.__zero_bound(data, result)482 return mutate_from483 def __rewrite(self, data, result):484 return self.__zero_bound(data, result)485 def __zero_bound(self, data, result):486 """This tries to get the size of the generated data under control by487 replacing the result with zero if we are too deep or have already488 generated too much data.489 This causes us to enter "shrinking mode" there and thus reduce490 the size of the generated data.491 """492 initial = len(result)493 if data.depth * 2 >= MAX_DEPTH or data.index >= self.cap:494 data.forced_indices.update(495 hrange(data.index, data.index + initial))496 data.hit_zero_bound = True497 result = hbytes(initial)498 elif data.index + initial >= self.cap:499 data.hit_zero_bound = True500 n = self.cap - data.index501 data.forced_indices.update(502 hrange(self.cap, data.index + initial))503 result = result[:n] + hbytes(initial - n)504 assert len(result) == initial505 return result506 @property507 def database(self):508 if self.database_key is None:509 return None510 return self.settings.database511 def has_existing_examples(self):512 return (513 self.database is not None and514 Phase.reuse in self.settings.phases515 )516 def reuse_existing_examples(self):517 """If appropriate (we have a database and have been told to use it),518 try to reload existing examples from the database.519 If there are a lot we don't try all of them. We always try the520 smallest example in the database (which is guaranteed to be the521 last failure) and the largest (which is usually the seed example522 which the last failure came from but we don't enforce that). We523 then take a random sampling of the remainder and try those. Any524 examples that are no longer interesting are cleared out.525 """526 if self.has_existing_examples():527 self.debug('Reusing examples from database')528 # We have to do some careful juggling here. We have two database529 # corpora: The primary and secondary. The primary corpus is a530 # small set of minimized examples each of which has at one point531 # demonstrated a distinct bug. We want to retry all of these.532 # We also have a secondary corpus of examples that have at some533 # point demonstrated interestingness (currently only ones that534 # were previously non-minimal examples of a bug, but this will535 # likely expand in future). These are a good source of potentially536 # interesting examples, but there are a lot of them, so we down537 # sample the secondary corpus to a more manageable size.538 corpus = sorted(539 self.settings.database.fetch(self.database_key),540 key=sort_key541 )542 desired_size = max(2, ceil(0.1 * self.settings.max_examples))543 for extra_key in [self.secondary_key, self.covering_key]:544 if len(corpus) < desired_size:545 extra_corpus = list(546 self.settings.database.fetch(extra_key),547 )548 shortfall = desired_size - len(corpus)549 if len(extra_corpus) <= shortfall:550 extra = extra_corpus551 else:552 extra = self.random.sample(extra_corpus, shortfall)553 extra.sort(key=sort_key)554 corpus.extend(extra)555 self.used_examples_from_database = len(corpus) > 0556 for existing in corpus:557 last_data = ConjectureData.for_buffer(existing)558 try:559 self.test_function(last_data)560 finally:561 if last_data.status != Status.INTERESTING:562 self.settings.database.delete(563 self.database_key, existing)564 self.settings.database.delete(565 self.secondary_key, existing)566 def exit_with(self, reason):567 self.exit_reason = reason568 raise RunIsComplete()569 def generate_new_examples(self):570 if Phase.generate not in self.settings.phases:571 return572 zero_data = self.cached_test_function(573 hbytes(self.settings.buffer_size))574 if zero_data.status == Status.OVERRUN or (575 zero_data.status == Status.VALID and576 len(zero_data.buffer) * 2 > self.settings.buffer_size577 ):578 fail_health_check(579 self.settings,580 'The smallest natural example for your test is extremely '581 'large. This makes it difficult for Hypothesis to generate '582 'good examples, especially when trying to reduce failing ones '583 'at the end. Consider reducing the size of your data if it is '584 'of a fixed size. You could also fix this by improving how '585 'your data shrinks (see https://hypothesis.readthedocs.io/en/'586 'latest/data.html#shrinking for details), or by introducing '587 'default values inside your strategy. e.g. could you replace '588 'some arguments with their defaults by using '589 'one_of(none(), some_complex_strategy)?',590 HealthCheck.large_base_example591 )592 # If the language starts with writes of length >= cap then there is593 # only one string in it: Everything after cap is forced to be zero (or594 # to be whatever value is written there). That means that once we've595 # tried the zero value, there's nothing left for us to do, so we596 # exit early here.597 for i in hrange(self.cap):598 if i not in zero_data.forced_indices:599 break600 else:601 self.exit_with(ExitReason.finished)602 self.health_check_state = HealthCheckState()603 count = 0604 while not self.interesting_examples and (605 count < 10 or self.health_check_state is not None606 ):607 prefix = self.generate_novel_prefix()608 def draw_bytes(data, n):609 if data.index < len(prefix):610 result = prefix[data.index:data.index + n]611 if len(result) < n:612 result += uniform(self.random, n - len(result))613 else:614 result = uniform(self.random, n)615 return self.__zero_bound(data, result)616 targets_found = len(self.covering_examples)617 last_data = ConjectureData(618 max_length=self.settings.buffer_size,619 draw_bytes=draw_bytes620 )621 self.test_function(last_data)622 last_data.freeze()623 if len(self.covering_examples) > targets_found:624 count = 0625 else:626 count += 1627 mutations = 0628 mutator = self._new_mutator()629 zero_bound_queue = []630 while not self.interesting_examples:631 if zero_bound_queue:632 # Whenever we generated an example and it hits a bound633 # which forces zero blocks into it, this creates a weird634 # distortion effect by making certain parts of the data635 # stream (especially ones to the right) much more likely636 # to be zero. We fix this by redistributing the generated637 # data by shuffling it randomly. This results in the638 # zero data being spread evenly throughout the buffer.639 # Hopefully the shrinking this causes will cause us to640 # naturally fail to hit the bound.641 # If it doesn't then we will queue the new version up again642 # (now with more zeros) and try again.643 overdrawn = zero_bound_queue.pop()644 buffer = bytearray(overdrawn.buffer)645 # These will have values written to them that are different646 # from what's in them anyway, so the value there doesn't647 # really "count" for distributional purposes, and if we648 # leave them in then they can cause the fraction of non649 # zero bytes to increase on redraw instead of decrease.650 for i in overdrawn.forced_indices:651 buffer[i] = 0652 self.random.shuffle(buffer)653 buffer = hbytes(buffer)654 def draw_bytes(data, n):655 result = buffer[data.index:data.index + n]656 if len(result) < n:657 result += hbytes(n - len(result))658 return self.__rewrite(data, result)659 data = ConjectureData(660 draw_bytes=draw_bytes,661 max_length=self.settings.buffer_size,662 )663 self.test_function(data)664 data.freeze()665 else:666 target, origin = self.target_selector.select()667 mutations += 1668 targets_found = len(self.covering_examples)669 data = ConjectureData(670 draw_bytes=mutator(origin),671 max_length=self.settings.buffer_size672 )673 self.test_function(data)674 data.freeze()675 if (676 data.status > origin.status or677 len(self.covering_examples) > targets_found678 ):679 mutations = 0680 elif (681 data.status < origin.status or682 not self.target_selector.has_tag(target, data) or683 mutations >= 10684 ):685 # Cap the variations of a single example and move on to686 # an entirely fresh start. Ten is an entirely arbitrary687 # constant, but it's been working well for years.688 mutations = 0689 mutator = self._new_mutator()690 if getattr(data, 'hit_zero_bound', False):691 zero_bound_queue.append(data)692 mutations += 1693 def _run(self):694 self.start_time = benchmark_time()695 self.reuse_existing_examples()696 self.generate_new_examples()697 self.shrink_interesting_examples()698 self.exit_with(ExitReason.finished)699 def shrink_interesting_examples(self):700 """If we've found interesting examples, try to replace each of them701 with a minimal interesting example with the same interesting_origin.702 We may find one or more examples with a new interesting_origin703 during the shrink process. If so we shrink these too.704 """705 if (706 Phase.shrink not in self.settings.phases or707 not self.interesting_examples708 ):709 return710 for prev_data in sorted(711 self.interesting_examples.values(),712 key=lambda d: sort_key(d.buffer)713 ):714 assert prev_data.status == Status.INTERESTING715 data = ConjectureData.for_buffer(prev_data.buffer)716 self.test_function(data)717 if data.status != Status.INTERESTING:718 self.exit_with(ExitReason.flaky)719 self.clear_secondary_key()720 while len(self.shrunk_examples) < len(self.interesting_examples):721 target, example = min([722 (k, v) for k, v in self.interesting_examples.items()723 if k not in self.shrunk_examples],724 key=lambda kv: (sort_key(kv[1].buffer), sort_key(repr(kv[0]))),725 )726 self.debug('Shrinking %r' % (target,))727 def predicate(d):728 if d.status < Status.INTERESTING:729 return False730 return d.interesting_origin == target731 self.shrink(example, predicate)732 self.shrunk_examples.add(target)733 def clear_secondary_key(self):734 if self.has_existing_examples():735 # If we have any smaller examples in the secondary corpus, now is736 # a good time to try them to see if they work as shrinks. They737 # probably won't, but it's worth a shot and gives us a good738 # opportunity to clear out the database.739 # It's not worth trying the primary corpus because we already740 # tried all of those in the initial phase.741 corpus = sorted(742 self.settings.database.fetch(self.secondary_key),743 key=sort_key744 )745 for c in corpus:746 primary = {747 v.buffer for v in self.interesting_examples.values()748 }749 cap = max(map(sort_key, primary))750 if sort_key(c) > cap:751 break752 else:753 self.cached_test_function(c)754 # We unconditionally remove c from the secondary key as it755 # is either now primary or worse than our primary example756 # of this reason for interestingness.757 self.settings.database.delete(self.secondary_key, c)758 def shrink(self, example, predicate):759 s = self.new_shrinker(example, predicate)760 s.shrink()761 return s.shrink_target762 def new_shrinker(self, example, predicate):763 return Shrinker(self, example, predicate)764 def prescreen_buffer(self, buffer):765 """Attempt to rule out buffer as a possible interesting candidate.766 Returns False if we know for sure that running this buffer will not767 produce an interesting result. Returns True if it might (because it768 explores territory we have not previously tried).769 This is purely an optimisation to try to reduce the number of tests we770 run. "return True" would be a valid but inefficient implementation.771 """772 node_index = 0773 n = len(buffer)774 for k, b in enumerate(buffer):775 if node_index in self.dead:776 return False777 try:778 # The block size at that point provides a lower bound on how779 # many more bytes are required. If the buffer does not have780 # enough bytes to fulfill that block size then we can rule out781 # this buffer.782 if k + self.block_sizes[node_index] > n:783 return False784 except KeyError:785 pass786 try:787 b = self.forced[node_index]788 except KeyError:789 pass790 try:791 b = b & self.masks[node_index]792 except KeyError:793 pass794 try:795 node_index = self.tree[node_index][b]796 except KeyError:797 return True798 else:799 return False800 def cached_test_function(self, buffer):801 node_index = 0802 for c in buffer:803 try:804 c = self.forced[node_index]805 except KeyError:806 pass807 try:808 c = c & self.masks[node_index]809 except KeyError:810 pass811 try:812 node_index = self.tree[node_index][c]813 except KeyError:814 break815 node = self.tree[node_index]816 if isinstance(node, ConjectureData):817 return node818 result = ConjectureData.for_buffer(buffer)819 self.test_function(result)820 return result821 def event_to_string(self, event):822 if isinstance(event, str):823 return event824 try:825 return self.events_to_strings[event]826 except KeyError:827 pass828 result = str(event)829 self.events_to_strings[event] = result830 return result831def _is_simple_mask(mask):832 """A simple mask is ``(2 ** n - 1)`` for some ``n``, so it has the effect833 of keeping the lowest ``n`` bits and discarding the rest.834 A mask in this form can produce any integer between 0 and the mask itself835 (inclusive), and the total number of these values is ``(mask + 1)``.836 """837 return (mask & (mask + 1)) == 0838def _draw_predecessor(rnd, xs):839 r = bytearray()840 any_strict = False841 for x in to_bytes_sequence(xs):842 if not any_strict:843 c = rnd.randint(0, x)844 if c < x:845 any_strict = True846 else:847 c = rnd.randint(0, 255)848 r.append(c)849 return hbytes(r)850def _draw_successor(rnd, xs):851 r = bytearray()852 any_strict = False853 for x in to_bytes_sequence(xs):854 if not any_strict:855 c = rnd.randint(x, 255)856 if c > x:857 any_strict = True858 else:859 c = rnd.randint(0, 255)860 r.append(c)861 return hbytes(r)862def sort_key(buffer):863 return (len(buffer), buffer)864def uniform(random, n):865 return int_to_bytes(random.getrandbits(n * 8), n)866class SampleSet(object):867 """Set data type with the ability to sample uniformly at random from it.868 The mechanism is that we store the set in two parts: A mapping of869 values to their index in an array. Sampling uniformly at random then870 becomes simply a matter of sampling from the array, but we can use871 the index for efficient lookup to add and remove values.872 """873 __slots__ = ('__values', '__index')874 def __init__(self):875 self.__values = []876 self.__index = {}877 def __len__(self):878 return len(self.__values)879 def __repr__(self):880 return 'SampleSet(%r)' % (self.__values,)881 def add(self, value):882 assert value not in self.__index883 # Adding simply consists of adding the value to the end of the array884 # and updating the index.885 self.__index[value] = len(self.__values)886 self.__values.append(value)887 def remove(self, value):888 # To remove a value we first remove it from the index. But this leaves889 # us with the value still in the array, so we have to fix that. We890 # can't simply remove the value from the array, as that would a) Be an891 # O(n) operation and b) Leave the index completely wrong for every892 # value after that index.893 # So what we do is we take the last element of the array and place it894 # in the position of the value we just deleted (if the value was not895 # already the last element of the array. If it was then we don't have896 # to do anything extra). This reorders the array, but that's OK because897 # we don't care about its order, we just need to sample from it.898 i = self.__index.pop(value)899 last = self.__values.pop()900 if i < len(self.__values):901 self.__values[i] = last902 self.__index[last] = i903 def choice(self, random):904 return random.choice(self.__values)905class Negated(object):906 __slots__ = ('tag',)907 def __init__(self, tag):908 self.tag = tag909NEGATED_CACHE = {} # type: dict910def negated(tag):911 try:912 return NEGATED_CACHE[tag]913 except KeyError:914 result = Negated(tag)915 NEGATED_CACHE[tag] = result916 return result917universal = UniqueIdentifier('universal')918class TargetSelector(object):919 """Data structure for selecting targets to use for mutation.920 The goal is to do a good job of exploiting novelty in examples without921 getting too obsessed with any particular novel factor.922 Roughly speaking what we want to do is give each distinct coverage target923 equal amounts of time. However some coverage targets may be harder to fuzz924 than others, or may only appear in a very small minority of examples, so we925 don't want to let those dominate the testing.926 Targets are selected according to the following rules:927 1. We ideally want valid examples as our starting point. We ignore928 interesting examples entirely, and other than that we restrict ourselves929 to the best example status we've seen so far. If we've only seen930 OVERRUN examples we use those. If we've seen INVALID but not VALID931 examples we use those. Otherwise we use VALID examples.932 2. Among the examples we've seen with the right status, when asked to933 select a target, we select a coverage target and return that along with934 an example exhibiting that target uniformly at random.935 Coverage target selection proceeds as follows:936 1. Whenever we return an example from select, we update the usage count of937 each of its tags.938 2. Whenever we see an example, we add it to the list of examples for all of939 its tags.940 3. When selecting a tag, we select one with a minimal usage count. Among941 those of minimal usage count we select one with the fewest examples.942 Among those, we select one uniformly at random.943 This has the following desirable properties:944 1. When two coverage targets are intrinsically linked (e.g. when you have945 multiple lines in a conditional so that either all or none of them will946 be covered in a conditional) they are naturally deduplicated.947 2. Popular coverage targets will largely be ignored for considering what948 test to run - if every example exhibits a coverage target, picking an949 example because of that target is rather pointless.950 3. When we discover new coverage targets we immediately exploit them until951 we get to the point where we've spent about as much time on them as the952 existing targets.953 4. Among the interesting deduplicated coverage targets we essentially954 round-robin between them, but with a more consistent distribution than955 uniformly at random, which is important particularly for short runs.956 """957 def __init__(self, random):958 self.random = random959 self.best_status = Status.OVERRUN960 self.reset()961 def reset(self):962 self.examples_by_tags = defaultdict(list)963 self.tag_usage_counts = Counter()964 self.tags_by_score = defaultdict(SampleSet)965 self.scores_by_tag = {}966 self.scores = []967 self.mutation_counts = 0968 self.example_counts = 0969 self.non_universal_tags = set()970 self.universal_tags = None971 def add(self, data):972 if data.status == Status.INTERESTING:973 return974 if data.status < self.best_status:975 return976 if data.status > self.best_status:977 self.best_status = data.status978 self.reset()979 if self.universal_tags is None:980 self.universal_tags = set(data.tags)981 else:982 not_actually_universal = self.universal_tags - data.tags983 for t in not_actually_universal:984 self.universal_tags.remove(t)985 self.non_universal_tags.add(t)986 self.examples_by_tags[t] = list(987 self.examples_by_tags[universal]988 )989 new_tags = data.tags - self.non_universal_tags990 for t in new_tags:991 self.non_universal_tags.add(t)992 self.examples_by_tags[negated(t)] = list(993 self.examples_by_tags[universal]994 )995 self.example_counts += 1996 for t in self.tags_for(data):997 self.examples_by_tags[t].append(data)998 self.rescore(t)999 def has_tag(self, tag, data):1000 if tag is universal:1001 return True1002 if isinstance(tag, Negated):1003 return tag.tag not in data.tags1004 return tag in data.tags1005 def tags_for(self, data):1006 yield universal1007 for t in data.tags:1008 yield t1009 for t in self.non_universal_tags:1010 if t not in data.tags:1011 yield negated(t)1012 def rescore(self, tag):1013 new_score = (1014 self.tag_usage_counts[tag], len(self.examples_by_tags[tag]))1015 try:1016 old_score = self.scores_by_tag[tag]1017 except KeyError:1018 pass1019 else:1020 self.tags_by_score[old_score].remove(tag)1021 self.scores_by_tag[tag] = new_score1022 sample = self.tags_by_score[new_score]1023 if len(sample) == 0:1024 heapq.heappush(self.scores, new_score)1025 sample.add(tag)1026 def select_tag(self):1027 while True:1028 peek = self.scores[0]1029 sample = self.tags_by_score[peek]1030 if len(sample) == 0:1031 heapq.heappop(self.scores)1032 else:1033 return sample.choice(self.random)1034 def select_example_for_tag(self, t):1035 return self.random.choice(self.examples_by_tags[t])1036 def select(self):1037 t = self.select_tag()1038 self.mutation_counts += 11039 result = self.select_example_for_tag(t)1040 assert self.has_tag(t, result)1041 for s in self.tags_for(result):1042 self.tag_usage_counts[s] += 11043 self.rescore(s)1044 return t, result1045def block_program(description):1046 """Mini-DSL for block rewriting. A sequence of commands that will be run1047 over all contiguous sequences of blocks of the description length in order.1048 Commands are:1049 * ".", keep this block unchanged1050 * "-", subtract one from this block.1051 * "0", replace this block with zero1052 * "X", delete this block1053 If a command does not apply (currently only because it's - on a zero1054 block) the block will be silently skipped over. As a side effect of1055 running a block program its score will be updated.1056 """1057 def run(self):1058 n = len(description)1059 i = 01060 while i + n <= len(self.shrink_target.blocks):1061 attempt = bytearray(self.shrink_target.buffer)1062 failed = False1063 for k, d in reversed(list(enumerate(description))):1064 j = i + k1065 u, v = self.blocks[j]1066 if d == '-':1067 value = int_from_bytes(attempt[u:v])1068 if value == 0:1069 failed = True1070 break1071 else:1072 attempt[u:v] = int_to_bytes(value - 1, v - u)1073 elif d == 'X':1074 del attempt[u:v]1075 else: # pragma: no cover1076 assert False, 'Unrecognised command %r' % (d,)1077 if failed or not self.incorporate_new_buffer(attempt):1078 i += 11079 run.command = description1080 run.__name__ = 'block_program(%r)' % (description,)1081 return run1082class PassClassification(Enum):1083 CANDIDATE = 01084 HOPEFUL = 11085 DUBIOUS = 21086 AVOID = 31087 SPECIAL = 41088@total_ordering1089@attr.s(slots=True, cmp=False)1090class ShrinkPass(object):1091 pass_function = attr.ib()1092 index = attr.ib()1093 classification = attr.ib(default=PassClassification.CANDIDATE)1094 successes = attr.ib(default=0)1095 runs = attr.ib(default=0)1096 calls = attr.ib(default=0)1097 shrinks = attr.ib(default=0)1098 deletions = attr.ib(default=0)1099 @property1100 def failures(self):1101 return self.runs - self.successes1102 @property1103 def name(self):1104 return self.pass_function.__name__1105 def __eq__(self, other):1106 return self.index == other.index1107 def __hash__(self):1108 return hash(self.index)1109 def __lt__(self, other):1110 return self.key() < other.key()1111 def key(self):1112 # Smaller is better.1113 return (1114 self.runs,1115 self.failures,1116 self.calls,1117 self.index1118 )1119class Shrinker(object):1120 """A shrinker is a child object of a ConjectureRunner which is designed to1121 manage the associated state of a particular shrink problem.1122 Currently the only shrink problem we care about is "interesting and with a1123 particular interesting_origin", but this is abstracted into a general1124 purpose predicate for more flexibility later - e.g. we are likely to want1125 to shrink with respect to a particular coverage target later.1126 Data with a status < VALID may be assumed not to satisfy the predicate.1127 The expected usage pattern is that this is only ever called from within the1128 engine.1129 """1130 DEFAULT_PASSES = [1131 'pass_to_descendant',1132 'zero_examples',1133 'adaptive_example_deletion',1134 'reorder_examples',1135 'minimize_duplicated_blocks',1136 'minimize_individual_blocks',1137 ]1138 EMERGENCY_PASSES = [1139 block_program('-XX'),1140 block_program('XX'),1141 'example_deletion_with_block_lowering',1142 'shrink_offset_pairs',1143 'minimize_block_pairs_retaining_sum',1144 ]1145 def __init__(self, engine, initial, predicate):1146 """Create a shrinker for a particular engine, with a given starting1147 point and predicate. When shrink() is called it will attempt to find an1148 example for which predicate is True and which is strictly smaller than1149 initial.1150 Note that initial is a ConjectureData object, and predicate1151 takes ConjectureData objects.1152 """1153 self.__engine = engine1154 self.__predicate = predicate1155 self.discarding_failed = False1156 self.__shrinking_prefixes = set()1157 self.initial_size = len(initial.buffer)1158 # We add a second level of caching local to the shrinker. This is a bit1159 # of a hack. Ideally we'd be able to rely on the engine's functionality1160 # for this. Doing it this way has two benefits: Firstly, the engine1161 # does not currently cache overruns (and probably shouldn't, but could1162 # recreate them on demand if necessary), and secondly Python dicts are1163 # much faster than our pure Python tree-based lookups.1164 self.__test_function_cache = {}1165 # We keep track of the current best example on the shrink_target1166 # attribute.1167 self.shrink_target = None1168 self.update_shrink_target(initial)1169 self.shrinks = 01170 self.initial_calls = self.__engine.call_count1171 self.current_pass_depth = 01172 self.passes_by_name = {}1173 self.clear_passes()1174 for p in Shrinker.DEFAULT_PASSES:1175 self.add_new_pass(p)1176 for p in Shrinker.EMERGENCY_PASSES:1177 self.add_new_pass(p, classification=PassClassification.AVOID)1178 self.add_new_pass(1179 'lower_common_block_offset',1180 classification=PassClassification.SPECIAL1181 )1182 def clear_passes(self):1183 """Reset all passes on the shrinker, leaving it in a blank state.1184 This is mostly useful for testing.1185 """1186 # Note that we deliberately do not clear passes_by_name. This means1187 # that we can still look up and explicitly run the standard passes,1188 # they just won't be avaiable by default.1189 self.passes = []1190 self.passes_awaiting_requeue = []1191 self.pass_queues = {c: [] for c in PassClassification}1192 self.known_programs = set()1193 def add_new_pass(self, run, classification=PassClassification.CANDIDATE):1194 """Creates a shrink pass corresponding to calling ``run(self)``"""1195 if isinstance(run, str):1196 run = getattr(Shrinker, run)1197 p = ShrinkPass(1198 pass_function=run, index=len(self.passes),1199 classification=classification,1200 )1201 if hasattr(run, 'command'):1202 self.known_programs.add(run.command)1203 self.passes.append(p)1204 self.passes_awaiting_requeue.append(p)1205 self.passes_by_name[p.name] = p1206 return p1207 def shrink_pass(self, name):1208 if hasattr(Shrinker, name) and name not in self.passes_by_name:1209 self.add_new_pass(name, classification=PassClassification.SPECIAL)1210 return self.passes_by_name[name]1211 def requeue_passes(self):1212 """Move all passes from passes_awaiting_requeue to their relevant1213 queues."""1214 while self.passes_awaiting_requeue:1215 p = self.passes_awaiting_requeue.pop()1216 heapq.heappush(self.pass_queues[p.classification], p)1217 def has_queued_passes(self, classification):1218 """Checks if any shrink passes are currently enqued under this1219 classification (note that there may be passes with this classification1220 currently awaiting requeue)."""1221 return len(self.pass_queues[classification]) > 01222 def pop_queued_pass(self, classification):1223 """Pop and run a single queued pass with this classification."""1224 sp = heapq.heappop(self.pass_queues[classification])1225 self.passes_awaiting_requeue.append(sp)1226 self.run_shrink_pass(sp)1227 def run_queued_until_change(self, classification):1228 """Run passes with this classification until there are no more or one1229 of them succeeds in shrinking the target."""1230 initial = self.shrink_target1231 while (1232 self.has_queued_passes(classification) and1233 self.shrink_target is initial1234 ):1235 self.pop_queued_pass(classification)1236 return self.shrink_target is not initial1237 def run_one_queued_pass(self, classification):1238 """Run a single queud pass with this classification (if there are1239 any)."""1240 if self.has_queued_passes(classification):1241 self.pop_queued_pass(classification)1242 def run_queued_passes(self, classification):1243 """Run all queued passes with this classification."""1244 while self.has_queued_passes(classification):1245 self.pop_queued_pass(classification)1246 @property1247 def calls(self):1248 return self.__engine.call_count1249 def consider_new_buffer(self, buffer):1250 buffer = hbytes(buffer)1251 return buffer.startswith(self.buffer) or \1252 self.incorporate_new_buffer(buffer)1253 def incorporate_new_buffer(self, buffer):1254 buffer = hbytes(buffer[:self.shrink_target.index])1255 try:1256 existing = self.__test_function_cache[buffer]1257 except KeyError:1258 pass1259 else:1260 return self.incorporate_test_data(existing)1261 # Sometimes an attempt at lexicographic minimization will do the wrong1262 # thing because the buffer has changed under it (e.g. something has1263 # turned into a write, the bit size has changed). The result would be1264 # an invalid string, but it's better for us to just ignore it here as1265 # it turns out to involve quite a lot of tricky book-keeping to get1266 # this right and it's better to just handle it in one place.1267 if sort_key(buffer) >= sort_key(self.shrink_target.buffer):1268 return False1269 if self.shrink_target.buffer.startswith(buffer):1270 return False1271 if not self.__engine.prescreen_buffer(buffer):1272 return False1273 assert sort_key(buffer) <= sort_key(self.shrink_target.buffer)1274 data = ConjectureData.for_buffer(buffer)1275 self.__engine.test_function(data)1276 self.__test_function_cache[buffer] = data1277 return self.incorporate_test_data(data)1278 def incorporate_test_data(self, data):1279 self.__test_function_cache[data.buffer] = data1280 if (1281 self.__predicate(data) and1282 sort_key(data.buffer) < sort_key(self.shrink_target.buffer)1283 ):1284 self.update_shrink_target(data)1285 self.__shrinking_block_cache = {}1286 return True1287 return False1288 def cached_test_function(self, buffer):1289 buffer = hbytes(buffer)1290 try:1291 return self.__test_function_cache[buffer]1292 except KeyError:1293 pass1294 result = self.__engine.cached_test_function(buffer)1295 self.incorporate_test_data(result)1296 self.__test_function_cache[buffer] = result1297 return result1298 def debug(self, msg):1299 self.__engine.debug(msg)1300 @property1301 def random(self):1302 return self.__engine.random1303 def run_shrink_pass(self, sp):1304 """Runs the function associated with ShrinkPass sp and updates the1305 relevant metadata.1306 Note that sp may or may not be a pass currently associated with1307 this shrinker. This does not handle any requeing that is1308 required.1309 """1310 if isinstance(sp, str):1311 sp = self.shrink_pass(sp)1312 self.debug('Shrink Pass %s' % (sp.name,))1313 initial_shrinks = self.shrinks1314 initial_calls = self.calls1315 size = len(self.shrink_target.buffer)1316 try:1317 sp.pass_function(self)1318 finally:1319 calls = self.calls - initial_calls1320 shrinks = self.shrinks - initial_shrinks1321 deletions = size - len(self.shrink_target.buffer)1322 sp.calls += calls1323 sp.shrinks += shrinks1324 sp.deletions += deletions1325 sp.runs += 11326 self.debug('Shrink Pass %s completed.' % (sp.name,))1327 # Complex state machine alert! A pass run can either succeed (we made1328 # at least one shrink) or fail (we didn't). This changes the pass's1329 # current classification according to the following possible1330 # transitions:1331 #1332 # CANDIDATE -------> HOPEFUL1333 # | ^1334 # | |1335 # v v1336 # AVOID ---------> DUBIOUS1337 #1338 # From best to worst we want to run HOPEFUL, CANDIDATE, DUBIOUS, AVOID.1339 # We will try any one of them if we have to but we want to prioritise.1340 #1341 # When a run succeeds, a pass will follow an arrow to a better class.1342 # When it fails, it will follow an arrow to a worse one.1343 # If no such arrow is available, it stays where it is.1344 #1345 # We also have the classification SPECIAL for passes that do not get1346 # run as part of the normal process.1347 previous = sp.classification1348 # If the pass didn't actually do anything we don't reclassify it. This1349 # is for things like remove_discarded which often are inapplicable.1350 if calls > 0 and sp.classification != PassClassification.SPECIAL:1351 if shrinks == 0:1352 if sp.successes > 0:1353 sp.classification = PassClassification.DUBIOUS1354 else:1355 sp.classification = PassClassification.AVOID1356 else:1357 sp.successes += 11358 if sp.classification == PassClassification.AVOID:1359 sp.classification = PassClassification.DUBIOUS1360 else:1361 sp.classification = PassClassification.HOPEFUL1362 if previous != sp.classification:1363 self.debug('Reclassified %s from %s to %s' % (1364 sp.name, previous.name, sp.classification.name1365 ))1366 def shrink(self):1367 """Run the full set of shrinks and update shrink_target.1368 This method is "mostly idempotent" - calling it twice is unlikely to1369 have any effect, though it has a non-zero probability of doing so.1370 """1371 # We assume that if an all-zero block of bytes is an interesting1372 # example then we're not going to do better than that.1373 # This might not technically be true: e.g. for integers() | booleans()1374 # the simplest example is actually [1, 0]. Missing this case is fairly1375 # harmless and this allows us to make various simplifying assumptions1376 # about the structure of the data (principally that we're never1377 # operating on a block of all zero bytes so can use non-zeroness as a1378 # signpost of complexity).1379 if (1380 not any(self.shrink_target.buffer) or1381 self.incorporate_new_buffer(hbytes(len(self.shrink_target.buffer)))1382 ):1383 return1384 try:1385 self.greedy_shrink()1386 finally:1387 if self.__engine.report_debug_info:1388 def s(n):1389 return 's' if n != 1 else ''1390 total_deleted = self.initial_size - len(1391 self.shrink_target.buffer)1392 self.debug('---------------------')1393 self.debug('Shrink pass profiling')1394 self.debug('---------------------')1395 self.debug('')1396 calls = self.__engine.call_count - self.initial_calls1397 self.debug((1398 'Shrinking made a total of %d call%s '1399 'of which %d shrank. This deleted %d byte%s out of %d.'1400 ) % (1401 calls, s(calls),1402 self.shrinks,1403 total_deleted, s(total_deleted),1404 self.initial_size,1405 ))1406 for useful in [True, False]:1407 self.debug('')1408 if useful:1409 self.debug('Useful passes:')1410 else:1411 self.debug('Useless passes:')1412 self.debug('')1413 for p in sorted(1414 self.passes,1415 key=lambda t: (1416 -t.calls, -t.runs,1417 t.deletions, t.shrinks,1418 ),1419 ):1420 if p.calls == 0:1421 continue1422 if (p.shrinks != 0) != useful:1423 continue1424 self.debug((1425 ' * %s ran %d time%s, making %d call%s of which '1426 '%d shrank, deleting %d byte%s.'1427 ) % (1428 p.name,1429 p.runs, s(p.runs),1430 p.calls, s(p.calls),1431 p.shrinks,1432 p.deletions, s(p.deletions),1433 ))1434 self.debug('')1435 def greedy_shrink(self):1436 """Run a full set of greedy shrinks (that is, ones that will only ever1437 move to a better target) and update shrink_target appropriately.1438 This method iterates to a fixed point and so is idempontent - calling1439 it twice will have exactly the same effect as calling it once.1440 """1441 self.run_shrink_pass('alphabet_minimize')1442 while self.single_greedy_shrink_iteration():1443 self.run_shrink_pass('lower_common_block_offset')1444 def single_greedy_shrink_iteration(self):1445 """Performs a single run through each greedy shrink pass, but does not1446 loop to achieve a fixed point."""1447 initial = self.shrink_target1448 # What follows is a slightly delicate dance. What we want to do is try1449 # to ensure that:1450 #1451 # 1. If it is possible for us to be deleting data, we should be.1452 # 2. We do not end up repeating a lot of passes uselessly.1453 # 3. We do not want to run expensive or useless passes if we can1454 # possibly avoid doing so.1455 self.requeue_passes()1456 self.run_shrink_pass('remove_discarded')1457 # First run the entire set of solid passes (ones that have previously1458 # made changes). It's important that we run all of them, not just one,1459 # as typically each pass may unlock others.1460 self.run_queued_passes(PassClassification.HOPEFUL)1461 # While our solid passes are successfully shrinking the buffer, we can1462 # just keep doing that (note that this is a stronger condition than1463 # just making shrinks - it's a much better sense of progress. We can1464 # make only O(n) length reductions but we can make exponentially many1465 # shrinks).1466 if len(self.buffer) < len(initial.buffer):1467 return True1468 # If we're stuck on length reductions then we pull in one candiate pass1469 # (if there are any).1470 # This should hopefully help us unlock any local minima that were1471 # starting to reduce the utility of the previous solid passes.1472 self.run_one_queued_pass(PassClassification.CANDIDATE)1473 # We've pulled in a new candidate pass (or have no candidate passes1474 # left) and are making shrinks with the solid passes, so lets just1475 # keep on doing that.1476 if self.shrink_target is not initial:1477 return True1478 # We're a bit stuck, so it's time to try some new passes.1479 for classification in [1480 # First we try rerunning every pass we've previously seen succeed.1481 PassClassification.DUBIOUS,1482 # If that didn't work, we pull in some new candidate passes.1483 PassClassification.CANDIDATE,1484 # If that still didn't work, we now pull out all the stops and1485 # bring in the desperation passes. These are either passes that1486 # started as CANDIDATE but we have never seen work, or ones that1487 # are so expensive that they begin life as AVOID.1488 PassClassification.AVOID1489 ]:1490 if self.run_queued_until_change(classification):1491 return True1492 assert self.shrink_target is initial1493 return False1494 @property1495 def buffer(self):1496 return self.shrink_target.buffer1497 @property1498 def blocks(self):1499 return self.shrink_target.blocks1500 def pass_to_descendant(self):1501 """Attempt to replace each example with a descendant example.1502 This is designed to deal with strategies that call themselves1503 recursively. For example, suppose we had:1504 binary_tree = st.deferred(1505 lambda: st.one_of(1506 st.integers(), st.tuples(binary_tree, binary_tree)))1507 This pass guarantees that we can replace any binary tree with one of1508 its subtrees - each of those will create an interval that the parent1509 could validly be replaced with, and this pass will try doing that.1510 This is pretty expensive - it takes O(len(intervals)^2) - so we run it1511 late in the process when we've got the number of intervals as far down1512 as possible.1513 """1514 for ex in self.each_non_trivial_example():1515 st = self.shrink_target1516 descendants = sorted(set(1517 st.buffer[d.start:d.end] for d in self.shrink_target.examples1518 if d.start >= ex.start and d.end <= ex.end and1519 d.length < ex.length and d.label == ex.label1520 ), key=sort_key)1521 for d in descendants:1522 if self.incorporate_new_buffer(1523 self.buffer[:ex.start] + d + self.buffer[ex.end:]1524 ):1525 break1526 def is_shrinking_block(self, i):1527 """Checks whether block i has been previously marked as a shrinking1528 block.1529 If the shrink target has changed since i was last checked, will1530 attempt to calculate if an equivalent block in a previous shrink1531 target was marked as shrinking.1532 """1533 if not self.__shrinking_prefixes:1534 return False1535 try:1536 return self.__shrinking_block_cache[i]1537 except KeyError:1538 pass1539 t = self.shrink_target1540 return self.__shrinking_block_cache.setdefault(1541 i,1542 t.buffer[:t.blocks[i][0]] in self.__shrinking_prefixes1543 )1544 def is_payload_block(self, i):1545 """A block is payload if it is entirely non-structural: We can tinker1546 with its value freely and this will not affect the shape of the input1547 language.1548 This is mostly a useful concept when we're doing lexicographic1549 minimimization on multiple blocks at once - by restricting ourself to1550 payload blocks, we expect the shape of the language to not change1551 under us (but must still guard against it doing so).1552 """1553 return not (1554 self.is_shrinking_block(i) or1555 i in self.shrink_target.forced_blocks1556 )1557 def lower_common_block_offset(self):1558 """Sometimes we find ourselves in a situation where changes to one part1559 of the byte stream unlock changes to other parts. Sometimes this is1560 good, but sometimes this can cause us to exhibit exponential slow1561 downs!1562 e.g. suppose we had the following:1563 m = draw(integers(min_value=0))1564 n = draw(integers(min_value=0))1565 assert abs(m - n) > 11566 If this fails then we'll end up with a loop where on each iteration we1567 reduce each of m and n by 2 - m can't go lower because of n, then n1568 can't go lower because of m.1569 This will take us O(m) iterations to complete, which is exponential in1570 the data size, as we gradually zig zag our way towards zero.1571 This can only happen if we're failing to reduce the size of the byte1572 stream: The number of iterations that reduce the length of the byte1573 stream is bounded by that length.1574 So what we do is this: We keep track of which blocks are changing, and1575 then if there's some non-zero common offset to them we try and minimize1576 them all at once by lowering that offset.1577 This may not work, and it definitely won't get us out of all possible1578 exponential slow downs (an example of where it doesn't is where the1579 shape of the blocks changes as a result of this bouncing behaviour),1580 but it fails fast when it doesn't work and gets us out of a really1581 nastily slow case when it does.1582 """1583 if len(self.__changed_blocks) <= 1:1584 return1585 current = self.shrink_target1586 blocked = [current.buffer[u:v] for u, v in current.blocks]1587 changed = [1588 i for i in sorted(self.__changed_blocks)1589 if any(blocked[i]) and i not in self.shrink_target.forced_blocks1590 ]1591 if not changed:1592 return1593 ints = [int_from_bytes(blocked[i]) for i in changed]1594 offset = min(ints)1595 assert offset > 01596 for i in hrange(len(ints)):1597 ints[i] -= offset1598 def reoffset(o):1599 new_blocks = list(blocked)1600 for i, v in zip(changed, ints):1601 new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))1602 return self.incorporate_new_buffer(hbytes().join(new_blocks))1603 new_offset = Integer.shrink(offset, reoffset, random=self.random)1604 if new_offset == offset:1605 self.clear_change_tracking()1606 def shrink_offset_pairs(self):1607 """Lower any two blocks offset from each other the same ammount.1608 Before this shrink pass, two blocks explicitly offset from each1609 other would not get minimized properly:1610 >>> b = st.integers(0, 255)1611 >>> find(st.tuples(b, b), lambda x: x[0] == x[1] + 1)1612 (149,148)1613 This expensive (O(n^2)) pass goes through every pair of non-zero1614 blocks in the current shrink target and sees if the shrink1615 target can be improved by applying an offset to both of them.1616 """1617 current = [self.shrink_target.buffer[u:v] for u, v in self.blocks]1618 def int_from_block(i):1619 return int_from_bytes(current[i])1620 def block_len(i):1621 u, v = self.blocks[i]1622 return v - u1623 # Try reoffseting every pair1624 def reoffset_pair(pair, o):1625 n = len(self.blocks)1626 # Number of blocks may have changed, need to validate1627 valid_pair = [1628 p for p in pair if p < n and int_from_block(p) > 0 and1629 self.is_payload_block(p)1630 ]1631 if len(valid_pair) < 2:1632 return1633 m = min([int_from_block(p) for p in valid_pair])1634 new_blocks = [self.shrink_target.buffer[u:v]1635 for u, v in self.blocks]1636 for i in valid_pair:1637 new_blocks[i] = int_to_bytes(1638 int_from_block(i) + o - m, block_len(i))1639 buffer = hbytes().join(new_blocks)1640 return self.incorporate_new_buffer(buffer)1641 i = 01642 while i < len(self.blocks):1643 if self.is_payload_block(i) and int_from_block(i) > 0:1644 j = i + 11645 while j < len(self.shrink_target.blocks):1646 block_val = int_from_block(j)1647 i_block_val = int_from_block(i)1648 if self.is_payload_block(j) \1649 and block_val > 0 and i_block_val > 0:1650 offset = min(int_from_block(i),1651 int_from_block(j))1652 # Save current before shrinking1653 current = [self.shrink_target.buffer[u:v]1654 for u, v in self.blocks]1655 Integer.shrink(1656 offset, lambda o: reoffset_pair((i, j), o),1657 random=self.random1658 )1659 j += 11660 i += 11661 def mark_shrinking(self, blocks):1662 """Mark each of these blocks as a shrinking block: That is, lowering1663 its value lexicographically may cause less data to be drawn after."""1664 t = self.shrink_target1665 for i in blocks:1666 if self.__shrinking_block_cache.get(i) is True:1667 continue1668 self.__shrinking_block_cache[i] = True1669 prefix = t.buffer[:t.blocks[i][0]]1670 self.__shrinking_prefixes.add(prefix)1671 def clear_change_tracking(self):1672 self.__changed_blocks.clear()1673 def mark_changed(self, i):1674 self.__changed_blocks.add(i)1675 def update_shrink_target(self, new_target):1676 assert new_target.frozen1677 if self.shrink_target is not None:1678 current = self.shrink_target.buffer1679 new = new_target.buffer1680 assert sort_key(new) < sort_key(current)1681 self.shrinks += 11682 if new_target.blocks != self.shrink_target.blocks:1683 self.clear_change_tracking()1684 else:1685 for i, (u, v) in enumerate(self.shrink_target.blocks):1686 if (1687 i not in self.__changed_blocks and1688 current[u:v] != new[u:v]1689 ):1690 self.mark_changed(i)1691 else:1692 self.__changed_blocks = set()1693 self.shrink_target = new_target1694 self.__shrinking_block_cache = {}1695 def try_shrinking_blocks(self, blocks, b):1696 """Attempts to replace each block in the blocks list with b. Returns1697 True if it succeeded (which may include some additional modifications1698 to shrink_target).1699 May call mark_shrinking with b if this causes a reduction in size.1700 In current usage it is expected that each of the blocks currently have1701 the same value, although this is not essential. Note that b must be1702 < the block at min(blocks) or this is not a valid shrink.1703 This method will attempt to do some small amount of work to delete data1704 that occurs after the end of the blocks. This is useful for cases where1705 there is some size dependency on the value of a block.1706 """1707 initial_attempt = bytearray(self.shrink_target.buffer)1708 for i, block in enumerate(blocks):1709 if block >= len(self.blocks):1710 blocks = blocks[:i]1711 break1712 u, v = self.blocks[block]1713 n = min(v - u, len(b))1714 initial_attempt[v - n:v] = b[-n:]1715 start = self.shrink_target.blocks[blocks[0]][0]1716 end = self.shrink_target.blocks[blocks[-1]][1]1717 initial_data = self.cached_test_function(initial_attempt)1718 if initial_data.status == Status.INTERESTING:1719 return initial_data is self.shrink_target1720 # If this produced something completely invalid we ditch it1721 # here rather than trying to persevere.1722 if initial_data.status < Status.VALID:1723 return False1724 # We've shrunk inside our group of blocks, so we have no way to1725 # continue. (This only happens when shrinking more than one block at1726 # a time).1727 if len(initial_data.buffer) < v:1728 return False1729 lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)1730 # If this did not in fact cause the data size to shrink we1731 # bail here because it's not worth trying to delete stuff from1732 # the remainder.1733 if lost_data <= 0:1734 return False1735 self.mark_shrinking(blocks)1736 # We now look for contiguous regions to delete that might help fix up1737 # this failed shrink. We only look for contiguous regions of the right1738 # lengths because doing anything more than that starts to get very1739 # expensive. See example_deletion_with_block_lowering for where we1740 # try to be more aggressive.1741 regions_to_delete = {(end, end + lost_data)}1742 for j in (blocks[-1] + 1, blocks[-1] + 2):1743 if j >= min(len(initial_data.blocks), len(self.blocks)):1744 continue1745 # We look for a block very shortly after the last one that has1746 # lost some of its size, and try to delete from the beginning so1747 # that it retains the same integer value. This is a bit of a hyper1748 # specific trick designed to make our integers() strategy shrink1749 # well.1750 r1, s1 = self.shrink_target.blocks[j]1751 r2, s2 = initial_data.blocks[j]1752 lost = (s1 - r1) - (s2 - r2)1753 # Apparently a coverage bug? An assert False in the body of this1754 # will reliably fail, but it shows up as uncovered.1755 if lost <= 0 or r1 != r2: # pragma: no cover1756 continue1757 regions_to_delete.add((r1, r1 + lost))1758 for ex in self.shrink_target.examples:1759 if ex.start > start:1760 continue1761 if ex.end <= end:1762 continue1763 replacement = initial_data.examples[ex.index]1764 in_original = [1765 c for c in ex.children if c.start >= end1766 ]1767 in_replaced = [1768 c for c in replacement.children if c.start >= end1769 ]1770 if len(in_replaced) >= len(in_original) or not in_replaced:1771 continue1772 # We've found an example where some of the children went missing1773 # as a result of this change, and just replacing it with the data1774 # it would have had and removing the spillover didn't work. This1775 # means that some of its children towards the right must be1776 # important, so we try to arrange it so that it retains its1777 # rightmost children instead of its leftmost.1778 regions_to_delete.add((1779 in_original[0].start, in_original[-len(in_replaced)].start1780 ))1781 for u, v in sorted(1782 regions_to_delete, key=lambda x: x[1] - x[0], reverse=True1783 ):1784 try_with_deleted = bytearray(initial_attempt)1785 del try_with_deleted[u:v]1786 if self.incorporate_new_buffer(try_with_deleted):1787 return True1788 return False1789 def remove_discarded(self):1790 """Try removing all bytes marked as discarded.1791 This pass is primarily to deal with data that has been ignored while1792 doing rejection sampling - e.g. as a result of an integer range, or a1793 filtered strategy.1794 Such data will also be handled by the adaptive_example_deletion pass,1795 but that pass is necessarily more conservative and will try deleting1796 each interval individually. The common case is that all data drawn and1797 rejected can just be thrown away immediately in one block, so this pass1798 will be much faster than trying each one individually when it works.1799 """1800 while self.shrink_target.has_discards:1801 discarded = []1802 for ex in self.shrink_target.examples:1803 if ex.discarded and (1804 not discarded or ex.start >= discarded[-1][-1]1805 ):1806 discarded.append((ex.start, ex.end))1807 assert discarded1808 attempt = bytearray(self.shrink_target.buffer)1809 for u, v in reversed(discarded):1810 del attempt[u:v]1811 if not self.incorporate_new_buffer(attempt):1812 break1813 def each_non_trivial_example(self):1814 """Iterates over all non-trivial examples in the current shrink target,1815 with care taken to ensure that every example yielded is current.1816 Makes the assumption that no modifications will be made to the1817 shrink target prior to the currently yielded example. If this1818 assumption is violated this will probably raise an error, so1819 don't do that.1820 """1821 stack = [0]1822 while stack:1823 target = stack.pop()1824 if isinstance(target, tuple):1825 parent, i = target1826 parent = self.shrink_target.examples[parent]1827 example_index = parent.children[i].index1828 else:1829 example_index = target1830 ex = self.shrink_target.examples[example_index]1831 if ex.trivial:1832 continue1833 yield ex1834 ex = self.shrink_target.examples[example_index]1835 if ex.trivial:1836 continue1837 for i in range(len(ex.children)):1838 stack.append((example_index, i))1839 def example_wise_shrink(self, shrinker, **kwargs):1840 """Runs a sequence shrinker on the children of each example."""1841 for ex in self.each_non_trivial_example():1842 st = self.shrink_target1843 pieces = [1844 st.buffer[c.start:c.end]1845 for c in ex.children1846 ]1847 if not pieces:1848 pieces = [st.buffer[ex.start:ex.end]]1849 prefix = st.buffer[:ex.start]1850 suffix = st.buffer[ex.end:]1851 shrinker.shrink(1852 pieces, lambda ls: self.incorporate_new_buffer(1853 prefix + hbytes().join(ls) + suffix,1854 ), random=self.random, **kwargs1855 )1856 def adaptive_example_deletion(self):1857 """Recursive deletion pass that tries to make the example located at1858 example_index as small as possible. This is the main point at which we1859 try to lower the size of the data.1860 First attempts to replace the example with its minimal possible version1861 using zero_example. If the example is trivial (either because of that1862 or because it was anyway) then we assume there's nothing we can1863 usefully do here and return early. Otherwise, we attempt to minimize it1864 by deleting its children.1865 If we do not make any successful changes, we recurse to the example's1866 children and attempt the same there.1867 """1868 self.example_wise_shrink(Length)1869 def zero_examples(self):1870 """Attempt to replace each example with a minimal version of itself."""1871 for ex in self.each_non_trivial_example():1872 u = ex.start1873 v = ex.end1874 attempt = self.cached_test_function(1875 self.buffer[:u] + hbytes(v - u) + self.buffer[v:]1876 )1877 # FIXME: IOU one attempt to debug this - DRMacIver1878 # This is a mysterious problem that should be impossible to trigger1879 # but isn't. I don't know what's going on, and it defeated my1880 # my attempts to reproduce or debug it. I'd *guess* it's related to1881 # nondeterminism in the test function. That should be impossible in1882 # the cases where I'm seeing it, but I haven't been able to put1883 # together a reliable reproduction of it.1884 if ex.index >= len(attempt.examples): # pragma: no cover1885 continue1886 in_replacement = attempt.examples[ex.index]1887 used = in_replacement.length1888 if (1889 not self.__predicate(attempt) and1890 in_replacement.end < len(attempt.buffer) and1891 used < ex.length1892 ):1893 self.incorporate_new_buffer(1894 self.buffer[:u] + hbytes(used) + self.buffer[v:]1895 )1896 def minimize_duplicated_blocks(self):1897 """Find blocks that have been duplicated in multiple places and attempt1898 to minimize all of the duplicates simultaneously.1899 This lets us handle cases where two values can't be shrunk1900 independently of each other but can easily be shrunk together.1901 For example if we had something like:1902 ls = data.draw(lists(integers()))1903 y = data.draw(integers())1904 assert y not in ls1905 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were1906 to replace both 3s with 0, this would be a valid shrink, but if we were1907 to replace either 3 with 0 on its own the test would start passing.1908 It is also useful for when that duplication is accidental and the value1909 of the blocks doesn't matter very much because it allows us to replace1910 more values at once.1911 """1912 def canon(b):1913 i = 01914 while i < len(b) and b[i] == 0:1915 i += 11916 return b[i:]1917 counts = Counter(1918 canon(self.shrink_target.buffer[u:v])1919 for u, v in self.blocks1920 )1921 counts.pop(hbytes(), None)1922 blocks = [buffer for buffer, count in counts.items() if count > 1]1923 blocks.sort(reverse=True)1924 blocks.sort(key=lambda b: counts[b] * len(b), reverse=True)1925 for block in blocks:1926 targets = [1927 i for i, (u, v) in enumerate(self.blocks)1928 if canon(self.shrink_target.buffer[u:v]) == block1929 ]1930 # This can happen if some blocks have been lost in the previous1931 # shrinking.1932 if len(targets) <= 1:1933 continue1934 Lexical.shrink(1935 block,1936 lambda b: self.try_shrinking_blocks(targets, b),1937 random=self.random, full=False1938 )1939 def minimize_individual_blocks(self):1940 """Attempt to minimize each block in sequence.1941 This is the pass that ensures that e.g. each integer we draw is a1942 minimum value. So it's the part that guarantees that if we e.g. do1943 x = data.draw(integers())1944 assert x < 101945 then in our shrunk example, x = 10 rather than say 97.1946 """1947 i = len(self.blocks) - 11948 while i >= 0:1949 u, v = self.blocks[i]1950 Lexical.shrink(1951 self.shrink_target.buffer[u:v],1952 lambda b: self.try_shrinking_blocks((i,), b),1953 random=self.random, full=False,1954 )1955 i -= 11956 def example_deletion_with_block_lowering(self):1957 """Sometimes we get stuck where there is data that we could easily1958 delete, but it changes the number of examples generated, so we have to1959 change that at the same time.1960 We handle most of the common cases in try_shrinking_blocks which is1961 pretty good at clearing out large contiguous blocks of dead space,1962 but it fails when there is data that has to stay in particular places1963 in the list.1964 This pass exists as an emergency procedure to get us unstuck. For every1965 example and every block not inside that example it tries deleting the1966 example and modifying the block's value by one in either direction.1967 """1968 i = 01969 while i < len(self.shrink_target.blocks):1970 if not self.is_shrinking_block(i):1971 i += 11972 continue1973 u, v = self.blocks[i]1974 j = 01975 while j < len(self.shrink_target.examples):1976 n = int_from_bytes(self.shrink_target.buffer[u:v])1977 if n == 0:1978 break1979 ex = self.shrink_target.examples[j]1980 if ex.start < v or ex.length == 0:1981 j += 11982 continue1983 buf = bytearray(self.shrink_target.buffer)1984 buf[u:v] = int_to_bytes(n - 1, v - u)1985 del buf[ex.start:ex.end]1986 if not self.incorporate_new_buffer(buf):1987 j += 11988 i += 11989 def minimize_block_pairs_retaining_sum(self):1990 """This pass minimizes pairs of blocks subject to the constraint that1991 their sum when interpreted as integers remains the same. This allow us1992 to normalize a number of examples that we would otherwise struggle on.1993 e.g. consider the following:1994 m = data.draw_bits(8)1995 n = data.draw_bits(8)1996 if m + n >= 256:1997 data.mark_interesting()1998 The ideal example for this is m=1, n=255, but we will almost never1999 find that without a pass like this - we would only do so if we2000 happened to draw n=255 by chance.2001 This kind of scenario comes up reasonably often in the context of e.g.2002 triggering overflow behaviour.2003 """2004 i = 02005 while i < len(self.shrink_target.blocks):2006 if self.is_payload_block(i):2007 j = i + 12008 while j < len(self.shrink_target.blocks):2009 u, v = self.shrink_target.blocks[i]2010 m = int_from_bytes(self.shrink_target.buffer[u:v])2011 if m == 0:2012 break2013 r, s = self.shrink_target.blocks[j]2014 n = int_from_bytes(self.shrink_target.buffer[r:s])2015 if (2016 s - r == v - u and2017 self.is_payload_block(j)2018 ):2019 def trial(x, y):2020 if s > len(self.shrink_target.buffer):2021 return False2022 attempt = bytearray(self.shrink_target.buffer)2023 try:2024 attempt[u:v] = int_to_bytes(x, v - u)2025 attempt[r:s] = int_to_bytes(y, s - r)2026 except OverflowError:2027 return False2028 return self.incorporate_new_buffer(attempt)2029 # We first attempt to move 1 from m to n. If that works2030 # then we treat that as a sign that it's worth trying2031 # a more expensive minimization. But if m was already 12032 # (we know it's > 0) then there's no point continuing2033 # because the value there is now zero.2034 if trial(m - 1, n + 1) and m > 1:2035 m = int_from_bytes(self.shrink_target.buffer[u:v])2036 n = int_from_bytes(self.shrink_target.buffer[r:s])2037 tot = m + n2038 Integer.shrink(2039 m, lambda x: trial(x, tot - x),2040 random=self.random2041 )2042 j += 12043 i += 12044 def reorder_examples(self):2045 """This pass allows us to reorder pairs of examples which come from the2046 same strategy (or strategies that happen to pun to the same label by2047 accident, but that shouldn't happen often).2048 For example, consider the following:2049 .. code-block:: python2050 import hypothesis.strategies as st2051 from hypothesis import given2052 @given(st.text(), st.text())2053 def test_does_not_exceed_100(x, y):2054 assert x != y2055 Without the ability to reorder x and y this could fail either with2056 ``x="", ``y="0"``, or the other way around. With reordering it will2057 reliably fail with ``x=""``, ``y="0"``.2058 """2059 self.example_wise_shrink(Ordering, key=sort_key)2060 def alphabet_minimize(self):2061 """Attempts to replace most bytes in the buffer with 0 or 1. The main2062 benefit of this is that it significantly increases our cache hit rate2063 by making things that are equivalent more likely to have the same2064 representation.2065 We only run this once rather than as part of the main passes as2066 once it's done its magic it's unlikely to ever be useful again.2067 It's important that it runs first though, because it makes2068 everything that comes after it faster because of the cache hits.2069 """2070 for c in (1, 0):2071 alphabet = set(self.buffer) - set(hrange(c + 1))2072 if not alphabet:2073 continue2074 def clear_to(reduced):2075 reduced = set(reduced)2076 attempt = hbytes([2077 b if b <= c or b in reduced else c2078 for b in self.buffer2079 ])2080 return self.consider_new_buffer(attempt)2081 Length.shrink(2082 sorted(alphabet), clear_to,2083 random=self.random,...

Full Screen

Full Screen

shrinker.py

Source:shrinker.py Github

copy

Full Screen

...650 self.__all_changed_blocks = set()651 def mark_changed(self, i):652 self.__changed_blocks.add(i)653 @property654 def __changed_blocks(self):655 if self.__last_checked_changed_at is not self.shrink_target:656 prev_target = self.__last_checked_changed_at657 new_target = self.shrink_target658 assert prev_target is not new_target659 prev = prev_target.buffer660 new = new_target.buffer661 assert sort_key(new) < sort_key(prev)662 if (663 len(new_target.blocks) != len(prev_target.blocks)664 or new_target.blocks.endpoints != prev_target.blocks.endpoints665 ):666 self.__all_changed_blocks = set()667 else:668 blocks = new_target.blocks...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful