How to use shrink_pass method in hypothesis

Best Python code snippet using hypothesis

shrinker.py

Source:shrinker.py Github

copy

Full Screen

...75 return self.run_step.__name__76 def __attrs_post_init__(self):77 assert self.name not in SHRINK_PASS_DEFINITIONS, self.name78 SHRINK_PASS_DEFINITIONS[self.name] = self79def defines_shrink_pass(generate_arguments):80 """A convenient decorator for defining shrink passes."""81 def accept(run_step):82 definition = ShrinkPassDefinition(83 generate_arguments=generate_arguments, run_step=run_step84 )85 def run(self):86 return self.run_shrink_pass(definition.name)87 run.__name__ = run_step.__name__88 run.is_shrink_pass = True89 return run90 return accept91class Shrinker(object):92 """A shrinker is a child object of a ConjectureRunner which is designed to93 manage the associated state of a particular shrink problem. That is, we94 have some initial ConjectureData object and some property of interest95 that it satisfies, and we want to find a ConjectureData object with a96 shortlex (see sort_key above) smaller buffer that exhibits the same97 property.98 Currently the only property of interest we use is that the status is99 INTERESTING and the interesting_origin takes on some fixed value, but we100 may potentially be interested in other use cases later.101 However we assume that data with a status < VALID never satisfies the predicate.102 The shrinker keeps track of a value shrink_target which represents the103 current best known ConjectureData object satisfying the predicate.104 It refines this value by repeatedly running *shrink passes*, which are105 methods that perform a series of transformations to the current shrink_target106 and evaluate the underlying test function to find new ConjectureData107 objects. If any of these satisfy the predicate, the shrink_target108 is updated automatically. Shrinking runs until no shrink pass can109 improve the shrink_target, at which point it stops. It may also be110 terminated if the underlying engine throws RunIsComplete, but that111 is handled by the calling code rather than the Shrinker.112 =======================113 Designing Shrink Passes114 =======================115 Generally a shrink pass is just any function that calls116 cached_test_function and/or incorporate_new_buffer a number of times,117 but there are a couple of useful things to bear in mind.118 A shrink pass *makes progress* if running it changes self.shrink_target119 (i.e. it tries a shortlex smaller ConjectureData object satisfying120 the predicate). The desired end state of shrinking is to find a121 value such that no shrink pass can make progress, i.e. that we122 are at a local minimum for each shrink pass.123 In aid of this goal, the main invariant that a shrink pass much124 satisfy is that whether it makes progress must be deterministic.125 It is fine (encouraged even) for the specific progress it makes126 to be non-deterministic, but if you run a shrink pass, it makes127 no progress, and then you immediately run it again, it should128 never succeed on the second time. This allows us to stop as soon129 as we have run each shrink pass and seen no progress on any of130 them.131 This means that e.g. it's fine to try each of N deletions132 or replacements in a random order, but it's not OK to try N random133 deletions (unless you have already shrunk at least once, though we134 don't currently take advantage of this loophole).135 Shrink passes need to be written so as to be robust against136 change in the underlying shrink target. It is generally safe137 to assume that the shrink target does not change prior to the138 point of first modification - e.g. if you change no bytes at139 index ``i``, all examples whose start is ``<= i`` still exist,140 as do all blocks, and the data object is still of length141 ``>= i + 1``. This can only be violated by bad user code which142 relies on an external source of non-determinism.143 When the underlying shrink_target changes, shrink144 passes should not run substantially more test_function calls145 on success than they do on failure. Say, no more than a constant146 factor more. In particular shrink passes should not iterate to a147 fixed point.148 This means that shrink passes are often written with loops that149 are carefully designed to do the right thing in the case that no150 shrinks occurred and try to adapt to any changes to do a reasonable151 job. e.g. say we wanted to write a shrink pass that tried deleting152 each individual byte (this isn't an especially good choice,153 but it leads to a simple illustrative example), we might do it154 by iterating over the buffer like so:155 .. code-block:: python156 i = 0157 while i < len(self.shrink_target.buffer):158 if not self.incorporate_new_buffer(159 self.shrink_target.buffer[: i] +160 self.shrink_target.buffer[i + 1 :]161 ):162 i += 1163 The reason for writing the loop this way is that i is always a164 valid index into the current buffer, even if the current buffer165 changes as a result of our actions. When the buffer changes,166 we leave the index where it is rather than restarting from the167 beginning, and carry on. This means that the number of steps we168 run in this case is always bounded above by the number of steps169 we would run if nothing works.170 Another thing to bear in mind about shrink pass design is that171 they should prioritise *progress*. If you have N operations that172 you need to run, you should try to order them in such a way as173 to avoid stalling, where you have long periods of test function174 invocations where no shrinks happen. This is bad because whenever175 we shrink we reduce the amount of work the shrinker has to do176 in future, and often speed up the test function, so we ideally177 wanted those shrinks to happen much earlier in the process.178 Sometimes stalls are inevitable of course - e.g. if the pass179 makes no progress, then the entire thing is just one long stall,180 but it's helpful to design it so that stalls are less likely181 in typical behaviour.182 The two easiest ways to do this are:183 * Just run the N steps in random order. As long as a184 reasonably large proportion of the operations suceed, this185 guarantees the expected stall length is quite short. The186 book keeping for making sure this does the right thing when187 it succeeds can be quite annoying.188 * When you have any sort of nested loop, loop in such a way189 that both loop variables change each time. This prevents190 stalls which occur when one particular value for the outer191 loop is impossible to make progress on, rendering the entire192 inner loop into a stall.193 However, although progress is good, too much progress can be194 a bad sign! If you're *only* seeing successful reductions,195 that's probably a sign that you are making changes that are196 too timid. Two useful things to offset this:197 * It's worth writing shrink passes which are *adaptive*, in198 the sense that when operations seem to be working really199 well we try to bundle multiple of them together. This can200 often be used to turn what would be O(m) successful calls201 into O(log(m)).202 * It's often worth trying one or two special minimal values203 before trying anything more fine grained (e.g. replacing204 the whole thing with zero).205 """206 def derived_value(fn):207 """It's useful during shrinking to have access to derived values of208 the current shrink target.209 This decorator allows you to define these as cached properties. They210 are calculated once, then cached until the shrink target changes, then211 recalculated the next time they are used."""212 def accept(self):213 try:214 return self.__derived_values[fn.__name__]215 except KeyError:216 return self.__derived_values.setdefault(fn.__name__, fn(self))217 accept.__name__ = fn.__name__218 return property(accept)219 def __init__(self, engine, initial, predicate):220 """Create a shrinker for a particular engine, with a given starting221 point and predicate. When shrink() is called it will attempt to find an222 example for which predicate is True and which is strictly smaller than223 initial.224 Note that initial is a ConjectureData object, and predicate225 takes ConjectureData objects.226 """227 self.__engine = engine228 self.__predicate = predicate229 self.__shrinking_prefixes = set()230 self.__derived_values = {}231 self.__pending_shrink_explanation = None232 self.initial_size = len(initial.buffer)233 # We keep track of the current best example on the shrink_target234 # attribute.235 self.shrink_target = None236 self.update_shrink_target(initial)237 self.shrinks = 0238 self.initial_calls = self.__engine.call_count239 self.passes_by_name = {}240 self.passes = []241 def explain_next_call_as(self, explanation):242 self.__pending_shrink_explanation = explanation243 def clear_call_explanation(self):244 self.__pending_shrink_explanation = None245 def add_new_pass(self, run):246 """Creates a shrink pass corresponding to calling ``run(self)``"""247 definition = SHRINK_PASS_DEFINITIONS[run]248 p = ShrinkPass(249 run_with_arguments=definition.run_step,250 generate_arguments=definition.generate_arguments,251 shrinker=self,252 index=len(self.passes),253 )254 self.passes.append(p)255 self.passes_by_name[p.name] = p256 return p257 def shrink_pass(self, name):258 """Return the ShrinkPass object for the pass with the given name."""259 if name not in self.passes_by_name:260 self.add_new_pass(name)261 return self.passes_by_name[name]262 @property263 def calls(self):264 """Return the number of calls that have been made to the underlying265 test function."""266 return self.__engine.call_count267 def consider_new_buffer(self, buffer):268 """Returns True if after running this buffer the result would be269 the current shrink_target."""270 buffer = hbytes(buffer)271 return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)272 def incorporate_new_buffer(self, buffer):273 """Either runs the test function on this buffer and returns True if274 that changed the shrink_target, or determines that doing so would275 be useless and returns False without running it."""276 buffer = hbytes(buffer[: self.shrink_target.index])277 # Sometimes an attempt at lexicographic minimization will do the wrong278 # thing because the buffer has changed under it (e.g. something has279 # turned into a write, the bit size has changed). The result would be280 # an invalid string, but it's better for us to just ignore it here as281 # it turns out to involve quite a lot of tricky book-keeping to get282 # this right and it's better to just handle it in one place.283 if sort_key(buffer) >= sort_key(self.shrink_target.buffer):284 return False285 if self.shrink_target.buffer.startswith(buffer):286 return False287 previous = self.shrink_target288 self.cached_test_function(buffer)289 return previous is not self.shrink_target290 def incorporate_test_data(self, data):291 """Takes a ConjectureData or Overrun object updates the current292 shrink_target if this data represents an improvement over it,293 returning True if it is."""294 if data is Overrun or data is self.shrink_target:295 return296 if self.__predicate(data) and sort_key(data.buffer) < sort_key(297 self.shrink_target.buffer298 ):299 self.update_shrink_target(data)300 self.__shrinking_block_cache = {}301 return True302 return False303 def cached_test_function(self, buffer):304 """Returns a cached version of the underlying test function, so305 that the result is either an Overrun object (if the buffer is306 too short to be a valid test case) or a ConjectureData object307 with status >= INVALID that would result from running this buffer."""308 if self.__pending_shrink_explanation is not None:309 self.debug(self.__pending_shrink_explanation)310 self.__pending_shrink_explanation = None311 buffer = hbytes(buffer)312 result = self.__engine.cached_test_function(buffer)313 self.incorporate_test_data(result)314 return result315 def debug(self, msg):316 self.__engine.debug(msg)317 @property318 def random(self):319 return self.__engine.random320 def run_shrink_pass(self, sp):321 """Runs the function associated with ShrinkPass sp and updates the322 relevant metadata.323 Note that sp may or may not be a pass currently associated with324 this shrinker. This does not handle any requeing that is325 required.326 """327 sp = self.shrink_pass(sp)328 self.debug("Shrink Pass %s" % (sp.name,))329 try:330 sp.runs += 1331 steps = sp.generate_steps()332 self.random.shuffle(steps)333 for s in steps:334 sp.run_step(s)335 finally:336 self.debug("Shrink Pass %s completed." % (sp.name,))337 def shrink(self):338 """Run the full set of shrinks and update shrink_target.339 This method is "mostly idempotent" - calling it twice is unlikely to340 have any effect, though it has a non-zero probability of doing so.341 """342 # We assume that if an all-zero block of bytes is an interesting343 # example then we're not going to do better than that.344 # This might not technically be true: e.g. for integers() | booleans()345 # the simplest example is actually [1, 0]. Missing this case is fairly346 # harmless and this allows us to make various simplifying assumptions347 # about the structure of the data (principally that we're never348 # operating on a block of all zero bytes so can use non-zeroness as a349 # signpost of complexity).350 if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(351 hbytes(len(self.shrink_target.buffer))352 ):353 return354 try:355 self.greedy_shrink()356 finally:357 if self.__engine.report_debug_info:358 def s(n):359 return "s" if n != 1 else ""360 total_deleted = self.initial_size - len(self.shrink_target.buffer)361 self.debug("---------------------")362 self.debug("Shrink pass profiling")363 self.debug("---------------------")364 self.debug("")365 calls = self.__engine.call_count - self.initial_calls366 self.debug(367 (368 "Shrinking made a total of %d call%s "369 "of which %d shrank. This deleted %d byte%s out of %d."370 )371 % (372 calls,373 s(calls),374 self.shrinks,375 total_deleted,376 s(total_deleted),377 self.initial_size,378 )379 )380 for useful in [True, False]:381 self.debug("")382 if useful:383 self.debug("Useful passes:")384 else:385 self.debug("Useless passes:")386 self.debug("")387 for p in sorted(388 self.passes,389 key=lambda t: (-t.calls, -t.runs, t.deletions, t.shrinks),390 ):391 if p.calls == 0:392 continue393 if (p.shrinks != 0) != useful:394 continue395 self.debug(396 (397 " * %s ran %d time%s, making %d call%s of which "398 "%d shrank, deleting %d byte%s."399 )400 % (401 p.name,402 p.runs,403 s(p.runs),404 p.calls,405 s(p.calls),406 p.shrinks,407 p.deletions,408 s(p.deletions),409 )410 )411 self.debug("")412 def greedy_shrink(self):413 """Run a full set of greedy shrinks (that is, ones that will only ever414 move to a better target) and update shrink_target appropriately.415 This method iterates to a fixed point and so is idempontent - calling416 it twice will have exactly the same effect as calling it once.417 """418 # The two main parts of shortlex optimisation are of course making419 # the test case "short" and "lex" (that is to say, lexicographically.420 # smaller). Sometimes these are intertwined and it's hard to do one421 # without the other, but as far as we can we *vastly* prefer to make422 # the test case shorter over making it lexicographically smaller.423 # The reason for this is that we can only make O(n) progress on424 # reducing the size, but at a fixed size there are O(256^n)425 # lexicographically smaller values. As a result it's easier to426 # reach a fixed point on size and also every reduction we make427 # in size reduces the amount of work we have to do lexicographically.428 # On top of that, reducing the size makes generation much faster,429 # so even if reducing the size earlier doesn't reduce the number430 # of test cases we run it may still result in significantly faster431 # tests.432 #433 # As a result of this we start by running a bunch of operations434 # that are primarily designed to reduce the size of the test.435 #436 # These fall into two categories:437 #438 # * "structured" ones respect the boundaries of draw calls and are439 # likely to correspond to semantically meaningful transformations440 # of the generated test case (e.g. deleting an element of a list,441 # replacing a tree node with one of its children).442 # * "unstructured" ones will often not correspond to any meaningful443 # transformation but may succeed by accident or because they happen444 # to correspond to an unexpectedly meaningful operation (e.g.445 # merging two adjacent lists).446 #447 # Generally we expect unstructured ones to succeed early on when448 # the test case has a lot of redundancy because they have plenty449 # of opportunity to work by accident, and after that they're mostly450 # unlikely work. As a result we do a slightly odd thing where we451 # run unstructured deletions as part of the initial pass, but then452 # we hold off on running them to a fixed point until we've turned453 # the emergency passes on later.454 unstructured_deletions = [block_program("X" * i) for i in hrange(5, 0, -1)]455 structured_deletions = ["pass_to_descendant", "adaptive_example_deletion"]456 self.fixate_shrink_passes(unstructured_deletions + structured_deletions)457 # "fine" passes are ones that are primarily concerned with lexicographic458 # reduction. They may still delete data (either deliberately or by accident)459 # but the distinguishing feature is that it is possible for them to460 # succeed without the length of the test case changing.461 # As a result we only start running them after we've hit a fixed point462 # for the deletion passes at least once.463 fine = [464 "alphabet_minimize",465 "zero_examples",466 "reorder_examples",467 "minimize_floats",468 "minimize_duplicated_blocks",469 "minimize_individual_blocks",470 ]471 self.fixate_shrink_passes(structured_deletions + fine)472 # "emergency" shrink passes are ones that handle cases that we473 # can't currently handle more elegantly - either they're slightly474 # weird hacks that happen to work or they're expensive passes to475 # run. Generally we hope that the emergency passes don't do anything476 # at all. We also re-enable the unstructured deletions at this point477 # in case new ones have been unlocked since we last ran them, but478 # don't expect that to be a common occurrence..479 emergency = [block_program("-XX"), "example_deletion_with_block_lowering"]480 self.fixate_shrink_passes(481 unstructured_deletions + structured_deletions + fine + emergency482 )483 def fixate_shrink_passes(self, passes):484 """Run steps from each pass in ``passes`` until the current shrink target485 is a fixed point of all of them."""486 passes = list(map(self.shrink_pass, passes))487 initial = None488 while initial is not self.shrink_target:489 initial = self.shrink_target490 for sp in passes:491 sp.runs += 1492 # We run remove_discarded after every step to do cleanup493 # keeping track of whether that actually works. Either there is494 # no discarded data and it is basically free, or it reliably works495 # and deletes data, or it doesn't work. In that latter case we turn496 # it off for the rest of this loop through the passes, but will497 # try again once all of the passes have been run.498 can_discard = self.remove_discarded()499 passes_with_steps = [(sp, None) for sp in passes]500 while passes_with_steps:501 to_run_next = []502 for sp, steps in passes_with_steps:503 if steps is None:504 steps = sp.generate_steps()505 failures = 0506 max_failures = 3507 while steps and failures < max_failures:508 prev_calls = self.calls509 prev = self.shrink_target510 sp.run_step(pop_random(self.random, steps))511 if prev_calls != self.calls:512 if can_discard:513 can_discard = self.remove_discarded()514 if prev is self.shrink_target:515 failures += 1516 if steps:517 to_run_next.append((sp, steps))518 passes_with_steps = to_run_next519 for sp in passes:520 sp.fixed_point_at = self.shrink_target521 @property522 def buffer(self):523 return self.shrink_target.buffer524 @property525 def blocks(self):526 return self.shrink_target.blocks527 @property528 def examples(self):529 return self.shrink_target.examples530 def all_block_bounds(self):531 return self.shrink_target.blocks.all_bounds()532 @derived_value533 def examples_by_label(self):534 """An index of all examples grouped by their label, with535 the examples stored in their normal index order."""536 examples_by_label = defaultdict(list)537 for ex in self.examples:538 examples_by_label[ex.label].append(ex)539 return dict(examples_by_label)540 def calculate_descents(self):541 """Returns a list of all pairs (i, j) such that542 self.examples[i] is an ancestor of self.examples[j] and543 they have the same label.544 """545 result = []546 for ls in self.examples_by_label.values():547 if len(ls) <= 1:548 continue549 for i, ex in enumerate(ls[:-1]):550 hi = len(ls)551 lo = i + 1552 if ls[lo].start >= ex.end:553 continue554 while lo + 1 < hi:555 mid = (lo + hi) // 2556 if ls[mid].start >= ex.end:557 hi = mid558 else:559 lo = mid560 id1 = self.stable_identifier_for_example(ex)561 result.extend(562 [563 (id1, self.stable_identifier_for_example(ls[j]))564 for j in hrange(i + 1, hi)565 ]566 )567 return result568 @defines_shrink_pass(calculate_descents)569 def pass_to_descendant(self, ancestor_id, descendant_id):570 """Attempt to replace each example with a descendant example.571 This is designed to deal with strategies that call themselves572 recursively. For example, suppose we had:573 binary_tree = st.deferred(574 lambda: st.one_of(575 st.integers(), st.tuples(binary_tree, binary_tree)))576 This pass guarantees that we can replace any binary tree with one of577 its subtrees - each of those will create an interval that the parent578 could validly be replaced with, and this pass will try doing that.579 This is pretty expensive - it takes O(len(intervals)^2) - so we run it580 late in the process when we've got the number of intervals as far down581 as possible.582 """583 ancestor = self.example_for_stable_identifier(ancestor_id)584 descendant = self.example_for_stable_identifier(descendant_id)585 if descendant is None:586 return587 assert ancestor is not None588 self.incorporate_new_buffer(589 self.buffer[: ancestor.start]590 + self.buffer[descendant.start : descendant.end]591 + self.buffer[ancestor.end :]592 )593 def is_shrinking_block(self, i):594 """Checks whether block i has been previously marked as a shrinking595 block.596 If the shrink target has changed since i was last checked, will597 attempt to calculate if an equivalent block in a previous shrink598 target was marked as shrinking.599 """600 if not self.__shrinking_prefixes:601 return False602 try:603 return self.__shrinking_block_cache[i]604 except KeyError:605 pass606 t = self.shrink_target607 return self.__shrinking_block_cache.setdefault(608 i, t.buffer[: t.blocks[i].start] in self.__shrinking_prefixes609 )610 def lower_common_block_offset(self):611 """Sometimes we find ourselves in a situation where changes to one part612 of the byte stream unlock changes to other parts. Sometimes this is613 good, but sometimes this can cause us to exhibit exponential slow614 downs!615 e.g. suppose we had the following:616 m = draw(integers(min_value=0))617 n = draw(integers(min_value=0))618 assert abs(m - n) > 1619 If this fails then we'll end up with a loop where on each iteration we620 reduce each of m and n by 2 - m can't go lower because of n, then n621 can't go lower because of m.622 This will take us O(m) iterations to complete, which is exponential in623 the data size, as we gradually zig zag our way towards zero.624 This can only happen if we're failing to reduce the size of the byte625 stream: The number of iterations that reduce the length of the byte626 stream is bounded by that length.627 So what we do is this: We keep track of which blocks are changing, and628 then if there's some non-zero common offset to them we try and minimize629 them all at once by lowering that offset.630 This may not work, and it definitely won't get us out of all possible631 exponential slow downs (an example of where it doesn't is where the632 shape of the blocks changes as a result of this bouncing behaviour),633 but it fails fast when it doesn't work and gets us out of a really634 nastily slow case when it does.635 """636 if len(self.__changed_blocks) <= 1:637 return638 current = self.shrink_target639 blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]640 changed = [641 i642 for i in sorted(self.__changed_blocks)643 if not self.shrink_target.blocks[i].trivial644 ]645 if not changed:646 return647 ints = [int_from_bytes(blocked[i]) for i in changed]648 offset = min(ints)649 assert offset > 0650 for i in hrange(len(ints)):651 ints[i] -= offset652 def reoffset(o):653 new_blocks = list(blocked)654 for i, v in zip(changed, ints):655 new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))656 return self.incorporate_new_buffer(hbytes().join(new_blocks))657 Integer.shrink(offset, reoffset, random=self.random)658 self.clear_change_tracking()659 def mark_shrinking(self, blocks):660 """Mark each of these blocks as a shrinking block: That is, lowering661 its value lexicographically may cause less data to be drawn after."""662 t = self.shrink_target663 for i in blocks:664 if self.__shrinking_block_cache.get(i) is True:665 continue666 self.__shrinking_block_cache[i] = True667 prefix = t.buffer[: t.blocks[i].start]668 self.__shrinking_prefixes.add(prefix)669 def clear_change_tracking(self):670 self.__last_checked_changed_at = self.shrink_target671 self.__all_changed_blocks = set()672 def mark_changed(self, i):673 self.__changed_blocks.add(i)674 @property675 def __changed_blocks(self):676 if self.__last_checked_changed_at is not self.shrink_target:677 prev_target = self.__last_checked_changed_at678 new_target = self.shrink_target679 assert prev_target is not new_target680 prev = prev_target.buffer681 new = new_target.buffer682 assert sort_key(new) < sort_key(prev)683 if (684 len(new_target.blocks) != len(prev_target.blocks)685 or new_target.blocks.endpoints != prev_target.blocks.endpoints686 ):687 self.__all_changed_blocks = set()688 else:689 blocks = new_target.blocks690 # Index of last block whose contents have been modified, found691 # by checking if the tail past this point has been modified.692 last_changed = binary_search(693 0,694 len(blocks),695 lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],696 )697 # Index of the first block whose contents have been changed,698 # because we know that this predicate is true for zero (because699 # the prefix from the start is empty), so the result must be True700 # for the bytes from the start of this block and False for the701 # bytes from the end, hence the change is in this block.702 first_changed = binary_search(703 0,704 len(blocks),705 lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],706 )707 # Between these two changed regions we now do a linear scan to708 # check if any specific block values have changed.709 for i in hrange(first_changed, last_changed + 1):710 u, v = blocks.bounds(i)711 if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:712 self.__all_changed_blocks.add(i)713 self.__last_checked_changed_at = new_target714 assert self.__last_checked_changed_at is self.shrink_target715 return self.__all_changed_blocks716 def update_shrink_target(self, new_target):717 assert isinstance(new_target, ConjectureResult)718 if self.shrink_target is not None:719 self.shrinks += 1720 else:721 self.__all_changed_blocks = set()722 self.__last_checked_changed_at = new_target723 self.shrink_target = new_target724 self.__shrinking_block_cache = {}725 self.__derived_values = {}726 def try_shrinking_blocks(self, blocks, b):727 """Attempts to replace each block in the blocks list with b. Returns728 True if it succeeded (which may include some additional modifications729 to shrink_target).730 May call mark_shrinking with b if this causes a reduction in size.731 In current usage it is expected that each of the blocks currently have732 the same value, although this is not essential. Note that b must be733 < the block at min(blocks) or this is not a valid shrink.734 This method will attempt to do some small amount of work to delete data735 that occurs after the end of the blocks. This is useful for cases where736 there is some size dependency on the value of a block.737 """738 initial_attempt = bytearray(self.shrink_target.buffer)739 for i, block in enumerate(blocks):740 if block >= len(self.blocks):741 blocks = blocks[:i]742 break743 u, v = self.blocks[block].bounds744 n = min(self.blocks[block].length, len(b))745 initial_attempt[v - n : v] = b[-n:]746 if not blocks:747 return False748 start = self.shrink_target.blocks[blocks[0]].start749 end = self.shrink_target.blocks[blocks[-1]].end750 initial_data = self.cached_test_function(initial_attempt)751 if initial_data.status == Status.INTERESTING:752 self.lower_common_block_offset()753 return initial_data is self.shrink_target754 # If this produced something completely invalid we ditch it755 # here rather than trying to persevere.756 if initial_data.status < Status.VALID:757 return False758 # We've shrunk inside our group of blocks, so we have no way to759 # continue. (This only happens when shrinking more than one block at760 # a time).761 if len(initial_data.buffer) < v:762 return False763 lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)764 # If this did not in fact cause the data size to shrink we765 # bail here because it's not worth trying to delete stuff from766 # the remainder.767 if lost_data <= 0:768 return False769 self.mark_shrinking(blocks)770 # We now look for contiguous regions to delete that might help fix up771 # this failed shrink. We only look for contiguous regions of the right772 # lengths because doing anything more than that starts to get very773 # expensive. See example_deletion_with_block_lowering for where we774 # try to be more aggressive.775 regions_to_delete = {(end, end + lost_data)}776 for j in (blocks[-1] + 1, blocks[-1] + 2):777 if j >= min(len(initial_data.blocks), len(self.blocks)):778 continue779 # We look for a block very shortly after the last one that has780 # lost some of its size, and try to delete from the beginning so781 # that it retains the same integer value. This is a bit of a hyper782 # specific trick designed to make our integers() strategy shrink783 # well.784 r1, s1 = self.shrink_target.blocks[j].bounds785 r2, s2 = initial_data.blocks[j].bounds786 lost = (s1 - r1) - (s2 - r2)787 # Apparently a coverage bug? An assert False in the body of this788 # will reliably fail, but it shows up as uncovered.789 if lost <= 0 or r1 != r2: # pragma: no cover790 continue791 regions_to_delete.add((r1, r1 + lost))792 for ex in self.shrink_target.examples:793 if ex.start > start:794 continue795 if ex.end <= end:796 continue797 replacement = initial_data.examples[ex.index]798 in_original = [c for c in ex.children if c.start >= end]799 in_replaced = [c for c in replacement.children if c.start >= end]800 if len(in_replaced) >= len(in_original) or not in_replaced:801 continue802 # We've found an example where some of the children went missing803 # as a result of this change, and just replacing it with the data804 # it would have had and removing the spillover didn't work. This805 # means that some of its children towards the right must be806 # important, so we try to arrange it so that it retains its807 # rightmost children instead of its leftmost.808 regions_to_delete.add(809 (in_original[0].start, in_original[-len(in_replaced)].start)810 )811 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):812 try_with_deleted = bytearray(initial_attempt)813 del try_with_deleted[u:v]814 if self.incorporate_new_buffer(try_with_deleted):815 return True816 return False817 def remove_discarded(self):818 """Try removing all bytes marked as discarded.819 This is primarily to deal with data that has been ignored while820 doing rejection sampling - e.g. as a result of an integer range, or a821 filtered strategy.822 Such data will also be handled by the adaptive_example_deletion pass,823 but that pass is necessarily more conservative and will try deleting824 each interval individually. The common case is that all data drawn and825 rejected can just be thrown away immediately in one block, so this pass826 will be much faster than trying each one individually when it works.827 returns False if there is discarded data and removing it does not work,828 otherwise returns True.829 """830 while self.shrink_target.has_discards:831 discarded = []832 for ex in self.shrink_target.examples:833 if (834 ex.length > 0835 and ex.discarded836 and (not discarded or ex.start >= discarded[-1][-1])837 ):838 discarded.append((ex.start, ex.end))839 # This can happen if we have discards but they are all of840 # zero length. This shouldn't happen very often so it's841 # faster to check for it here than at the point of example842 # generation.843 if not discarded:844 break845 attempt = bytearray(self.shrink_target.buffer)846 for u, v in reversed(discarded):847 del attempt[u:v]848 if not self.incorporate_new_buffer(attempt):849 return False850 return True851 @derived_value852 def stable_identifier_arguments(self):853 return [(self.stable_identifier_for_example(ex),) for ex in self.examples]854 @defines_shrink_pass(lambda self: self.stable_identifier_arguments)855 def adaptive_example_deletion(self, example_id):856 """Attempts to delete every example from the test case.857 That is, it is logically equivalent to trying ``self.buffer[:ex.start] +858 self.buffer[ex.end:]`` for every example ``ex``. The order in which859 examples are tried is randomized, and when deletion is successful it860 will attempt to adapt to delete more than one example at a time.861 """862 example = self.example_for_stable_identifier(example_id)863 if example is None or not self.incorporate_new_buffer(864 self.buffer[: example.start] + self.buffer[example.end :]865 ):866 return867 # If we successfully deleted one example there may be a useful868 # deletable region around here.869 original = self.shrink_target870 endpoints = set()871 for ex in original.examples:872 if ex.depth <= example.depth:873 endpoints.add(ex.start)874 endpoints.add(ex.end)875 partition = sorted(endpoints)876 j = partition.index(example.start)877 def delete_region(a, b):878 assert a <= j <= b879 if a < 0 or b >= len(partition) - 1:880 return False881 return self.consider_new_buffer(882 original.buffer[: partition[a]] + original.buffer[partition[b] :]883 )884 to_right = find_integer(lambda n: delete_region(j, j + n))885 if to_right > 0:886 find_integer(lambda n: delete_region(j - n, j + to_right))887 def try_zero_example(self, ex):888 u = ex.start889 v = ex.end890 attempt = self.cached_test_function(891 self.buffer[:u] + hbytes(v - u) + self.buffer[v:]892 )893 if attempt is Overrun:894 return False895 in_replacement = attempt.examples[ex.index]896 used = in_replacement.length897 if attempt is not self.shrink_target:898 if in_replacement.end < len(attempt.buffer) and used < ex.length:899 self.incorporate_new_buffer(900 self.buffer[:u] + hbytes(used) + self.buffer[v:]901 )902 return self.examples[ex.index].trivial903 @defines_shrink_pass(lambda self: self.stable_identifier_arguments)904 def zero_examples(self, ex_id):905 """Attempt to replace each example with a minimal version of itself."""906 ex = self.example_for_stable_identifier(ex_id)907 """Attempt to replace each example with a minimal version of itself."""908 # If the example is already trivial, assume there's nothing to do here.909 # We could attempt to use it as an adaptive replacement for other910 # similar examples, but that seems to be ineffective, resulting mostly911 # in redundant work rather than helping.912 if ex is None or ex.trivial:913 return914 if not self.try_zero_example(ex):915 return916 # If we zeroed the example we need to get the new one that replaced it.917 ex = self.examples[ex.index]918 original = self.shrink_target919 group = self.examples_by_label[ex.label]920 i = group.index(ex)921 replacement = self.buffer[ex.start : ex.end]922 # We first expand to cover the trivial region surrounding this group.923 # This avoids a situation where the adaptive phase "succeeds" a lot by924 # virtue of not doing anything and then goes into a galloping phase925 # where it does a bunch of useless work.926 def all_trivial(a, b):927 if a < 0 or b > len(group):928 return False929 return all(e.trivial for e in group[a:b])930 start, end = expand_region(all_trivial, i, i + 1)931 # If we've got multiple trivial examples of different lengths then932 # this isn't going to work as a replacement for all of them and so we933 # skip out early.934 if any(e.length != len(replacement) for e in group[start:end]):935 return936 def can_zero(a, b):937 if a < 0 or b > len(group):938 return False939 regions = []940 for e in group[a:b]:941 t = (e.start, e.end, replacement)942 if not regions or t[0] >= regions[-1][1]:943 regions.append(t)944 return self.consider_new_buffer(replace_all(original.buffer, regions))945 expand_region(can_zero, start, end)946 @derived_value947 def blocks_by_non_zero_suffix(self):948 """Returns a list of blocks grouped by their non-zero suffix,949 as a list of (suffix, indices) pairs, skipping all groupings950 where there is only one index.951 This is only used for the arguments of minimize_duplicated_blocks.952 """953 duplicates = defaultdict(list)954 for block in self.blocks:955 duplicates[non_zero_suffix(self.buffer[block.start : block.end])].append(956 block.index957 )958 return duplicates959 @defines_shrink_pass(960 lambda self: [(b,) for b in sorted(self.blocks_by_non_zero_suffix)]961 )962 def minimize_duplicated_blocks(self, block):963 """Find blocks that have been duplicated in multiple places and attempt964 to minimize all of the duplicates simultaneously.965 This lets us handle cases where two values can't be shrunk966 independently of each other but can easily be shrunk together.967 For example if we had something like:968 ls = data.draw(lists(integers()))969 y = data.draw(integers())970 assert y not in ls971 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were972 to replace both 3s with 0, this would be a valid shrink, but if we were973 to replace either 3 with 0 on its own the test would start passing.974 It is also useful for when that duplication is accidental and the value975 of the blocks doesn't matter very much because it allows us to replace976 more values at once.977 """978 targets = self.blocks_by_non_zero_suffix[block]979 if len(targets) <= 1:980 return981 Lexical.shrink(982 block,983 lambda b: self.try_shrinking_blocks(targets, b),984 random=self.random,985 full=False,986 )987 @defines_shrink_pass(lambda self: self.stable_identifier_arguments)988 def minimize_floats(self, ex_id):989 """Some shrinks that we employ that only really make sense for our990 specific floating point encoding that are hard to discover from any991 sort of reasonable general principle. This allows us to make992 transformations like replacing a NaN with an Infinity or replacing993 a float with its nearest integers that we would otherwise not be994 able to due to them requiring very specific transformations of995 the bit sequence.996 We only apply these transformations to blocks that "look like" our997 standard float encodings because they are only really meaningful998 there. The logic for detecting this is reasonably precise, but999 it doesn't matter if it's wrong. These are always valid1000 transformations to make, they just don't necessarily correspond to1001 anything particularly meaningful for non-float values.1002 """1003 ex = self.example_for_stable_identifier(ex_id)1004 if ex is None or not (1005 ex.label == DRAW_FLOAT_LABEL1006 and len(ex.children) == 21007 and ex.children[0].length == 81008 ):1009 return1010 u = ex.children[0].start1011 v = ex.children[0].end1012 buf = self.shrink_target.buffer1013 b = buf[u:v]1014 f = lex_to_float(int_from_bytes(b))1015 b2 = int_to_bytes(float_to_lex(f), 8)1016 if b == b2 or self.consider_new_buffer(buf[:u] + b2 + buf[v:]):1017 Float.shrink(1018 f,1019 lambda x: self.consider_new_buffer(1020 self.shrink_target.buffer[:u]1021 + int_to_bytes(float_to_lex(x), 8)1022 + self.shrink_target.buffer[v:]1023 ),1024 random=self.random,1025 )1026 @defines_shrink_pass(lambda self: [(i,) for i in hrange(len(self.blocks))])1027 def minimize_individual_blocks(self, i):1028 """Attempt to minimize each block in sequence.1029 This is the pass that ensures that e.g. each integer we draw is a1030 minimum value. So it's the part that guarantees that if we e.g. do1031 x = data.draw(integers())1032 assert x < 101033 then in our shrunk example, x = 10 rather than say 97.1034 """1035 if i >= len(self.blocks):1036 return1037 block = self.blocks[i]1038 u, v = block.bounds1039 i = block.index1040 Lexical.shrink(1041 self.shrink_target.buffer[u:v],1042 lambda b: self.try_shrinking_blocks((i,), b),1043 random=self.random,1044 full=False,1045 )1046 @defines_shrink_pass(1047 lambda self: [1048 (i, self.stable_identifier_for_example(ex))1049 for i in hrange(len(self.blocks))1050 if self.is_shrinking_block(i)1051 for ex in self.examples1052 if ex.length > 0 and ex.start >= self.blocks[i].end1053 ]1054 )1055 def example_deletion_with_block_lowering(self, block_i, ex_id):1056 """Sometimes we get stuck where there is data that we could easily1057 delete, but it changes the number of examples generated, so we have to1058 change that at the same time.1059 We handle most of the common cases in try_shrinking_blocks which is1060 pretty good at clearing out large contiguous blocks of dead space,1061 but it fails when there is data that has to stay in particular places1062 in the list.1063 This pass exists as an emergency procedure to get us unstuck. For every1064 example and every block not inside that example it tries deleting the1065 example and modifying the block's value by one in either direction.1066 """1067 if block_i >= len(self.blocks):1068 return1069 block = self.blocks[block_i]1070 ex = self.example_for_stable_identifier(ex_id)1071 if ex is None:1072 return1073 u, v = block.bounds1074 n = int_from_bytes(self.shrink_target.buffer[u:v])1075 if n == 0:1076 return1077 buf = bytearray(self.shrink_target.buffer)1078 buf[u:v] = int_to_bytes(n - 1, v - u)1079 del buf[ex.start : ex.end]1080 self.incorporate_new_buffer(buf)1081 @defines_shrink_pass(1082 lambda self: [1083 (self.stable_identifier_for_example(e), l)1084 for e in self.examples1085 for l in sorted({c.label for c in e.children}, key=str)1086 ]1087 )1088 def reorder_examples(self, ex_id, label):1089 """This pass allows us to reorder the children of each example.1090 For example, consider the following:1091 .. code-block:: python1092 import hypothesis.strategies as st1093 from hypothesis import given1094 @given(st.text(), st.text())1095 def test_not_equal(x, y):1096 assert x != y1097 Without the ability to reorder x and y this could fail either with1098 ``x=""``, ``y="0"``, or the other way around. With reordering it will1099 reliably fail with ``x=""``, ``y="0"``.1100 """1101 ex = self.example_for_stable_identifier(ex_id)1102 if ex is None:1103 return1104 group = [c for c in ex.children if c.label == label]1105 if len(group) <= 1:1106 return1107 st = self.shrink_target1108 pieces = [st.buffer[ex.start : ex.end] for ex in group]1109 endpoints = [(ex.start, ex.end) for ex in group]1110 Ordering.shrink(1111 pieces,1112 lambda ls: self.consider_new_buffer(1113 replace_all(st.buffer, [(u, v, r) for (u, v), r in zip(endpoints, ls)])1114 ),1115 random=self.random,1116 )1117 @defines_shrink_pass(lambda self: [(c,) for c in hrange(256)])1118 def alphabet_minimize(self, c):1119 """Attempts to minimize the "alphabet" - the set of bytes that1120 are used in the representation of the current buffer. The main1121 benefit of this is that it significantly increases our cache hit rate1122 by making things that are equivalent more likely to have the same1123 representation, but it's also generally a rather effective "fuzzing"1124 step that gives us a lot of good opportunities to slip to a smaller1125 representation of the same bug.1126 """1127 buf = self.buffer1128 if c not in buf:1129 return1130 def can_replace_with(d):1131 if d < 0:1132 return False1133 if self.consider_new_buffer(hbytes([d if b == c else b for b in buf])):1134 if d <= 1:1135 # For small values of d if this succeeds we take this1136 # as evidence that it is worth doing a a bulk replacement1137 # where we replace all values which are close1138 # to c but smaller with d as well. This helps us substantially1139 # in cases where we have a lot of "dead" bytes that don't really do1140 # much, as it allows us to replace many of them in one go rather1141 # than one at a time. An example of where this matters is1142 # test_minimize_multiple_elements_in_silly_large_int_range_min_is_not_dupe1143 # in test_shrink_quality.py1144 def replace_range(k):1145 if k > c:1146 return False1147 def should_replace_byte(b):1148 return c - k <= b <= c and d < b1149 return self.consider_new_buffer(1150 hbytes([d if should_replace_byte(b) else b for b in buf])1151 )1152 find_integer(replace_range)1153 return True1154 if (1155 # If we cannot replace the current byte with its predecessor,1156 # assume it is already minimal and continue on. This ensures1157 # we make no more than one call per distinct byte value in the1158 # event that no shrinks are possible here.1159 not can_replace_with(c - 1)1160 # We next try replacing with 0 or 1. If this works then1161 # there is nothing else to do here.1162 or can_replace_with(0)1163 or can_replace_with(1)1164 # Finally we try to replace with c - 2 before going on to the1165 # binary search so that in cases which were already nearly1166 # minimal we don't do log(n) extra work.1167 or not can_replace_with(c - 2)1168 ):1169 return1170 # Now binary search to find a small replacement.1171 # Invariant: We cannot replace with lo, we can replace with hi.1172 lo = 11173 hi = c - 21174 while lo + 1 < hi:1175 mid = (lo + hi) // 21176 if can_replace_with(mid):1177 hi = mid1178 else:1179 lo = mid1180 def stable_identifier_for_example(self, example):1181 """A stable identifier is one that we can reasonably reliably1182 count on referring to "logically the same" example between two1183 different test runs. It is currently represented as a path from1184 the root."""1185 return example.index1186 def example_for_stable_identifier(self, identifier):1187 """Returns the example in the current shrink target corresponding1188 to this stable identifier, or None if no such example exists."""1189 if identifier >= len(self.examples):1190 return None1191 return self.shrink_target.examples[identifier]1192 def run_block_program(self, i, description, original, repeats=1):1193 """Block programs are a mini-DSL for block rewriting, defined as a sequence1194 of commands that can be run at some index into the blocks1195 Commands are:1196 * "-", subtract one from this block.1197 * "X", delete this block1198 If a command does not apply (currently only because it's - on a zero1199 block) the block will be silently skipped over.1200 This method runs the block program in ``description`` at block index1201 ``i`` on the ConjectureData ``original``. If ``repeats > 1`` then it1202 will attempt to approximate the results of running it that many times.1203 Returns True if this successfully changes the underlying shrink target,1204 else False.1205 """1206 if i + len(description) > len(original.blocks) or i < 0:1207 return False1208 attempt = bytearray(original.buffer)1209 for _ in hrange(repeats):1210 for k, d in reversed(list(enumerate(description))):1211 j = i + k1212 u, v = original.blocks[j].bounds1213 if v > len(attempt):1214 return False1215 if d == "-":1216 value = int_from_bytes(attempt[u:v])1217 if value == 0:1218 return False1219 else:1220 attempt[u:v] = int_to_bytes(value - 1, v - u)1221 elif d == "X":1222 del attempt[u:v]1223 else: # pragma: no cover1224 assert False, "Unrecognised command %r" % (d,)1225 return self.incorporate_new_buffer(attempt)1226def block_program(description):1227 """Mini-DSL for block rewriting. A sequence of commands that will be run1228 over all contiguous sequences of blocks of the description length in order.1229 Commands are:1230 * ".", keep this block unchanged1231 * "-", subtract one from this block.1232 * "0", replace this block with zero1233 * "X", delete this block1234 If a command does not apply (currently only because it's - on a zero1235 block) the block will be silently skipped over. As a side effect of1236 running a block program its score will be updated.1237 """1238 name = "block_program(%r)" % (description,)1239 if name not in SHRINK_PASS_DEFINITIONS:1240 """Defines a shrink pass that runs the block program ``description``1241 at every block index."""1242 n = len(description)1243 def generate_arguments(self):1244 return [(i,) for i in hrange(len(self.blocks))]1245 def run(self, i):1246 """Adaptively attempt to run the block program at the current1247 index. If this successfully applies the block program ``k`` times1248 then this runs in ``O(log(k))`` test function calls."""1249 if i + n >= len(self.shrink_target.blocks):1250 return1251 # First, run the block program at the chosen index. If this fails,1252 # don't do any extra work, so that failure is as cheap as possible.1253 if not self.run_block_program(i, description, original=self.shrink_target):1254 return1255 # Because we run in a random order we will often find ourselves in the middle1256 # of a region where we could run the block program. We thus start by moving1257 # left to the beginning of that region if possible in order to to start from1258 # the beginning of that region.1259 def offset_left(k):1260 return i - k * n1261 i = offset_left(1262 find_integer(1263 lambda k: self.run_block_program(1264 offset_left(k), description, original=self.shrink_target1265 )1266 )1267 )1268 original = self.shrink_target1269 # Now try to run the block program multiple times here.1270 find_integer(1271 lambda k: self.run_block_program(1272 i, description, original=original, repeats=k1273 )1274 )1275 run.__name__ = name1276 defines_shrink_pass(generate_arguments)(run)1277 assert name in SHRINK_PASS_DEFINITIONS1278 return name1279@attr.s(slots=True, cmp=False)1280class ShrinkPass(object):1281 run_with_arguments = attr.ib()1282 generate_arguments = attr.ib()1283 index = attr.ib()1284 shrinker = attr.ib()1285 fixed_point_at = attr.ib(default=None)1286 successes = attr.ib(default=0)1287 runs = attr.ib(default=0)1288 calls = attr.ib(default=0)1289 shrinks = attr.ib(default=0)1290 deletions = attr.ib(default=0)...

Full Screen

Full Screen

test_shrinker.py

Source:test_shrinker.py Github

copy

Full Screen

...374 @shrinking_from([255])375 def shrinker(data):376 data.draw_bits(8)377 data.mark_interesting()378 sp = shrinker.shrink_pass(block_program("X"))379 assert isinstance(sp, ShrinkPass)380 assert shrinker.shrink_pass(sp) is sp381def test_will_terminate_stalled_shrinks():382 # Suppress the time based slow shrinking check - we only want383 # the one that checks if we're in a stall where we've shrunk384 # as far as we're going to get.385 time.freeze()386 @shrinking_from([255] * 100)387 def shrinker(data):388 count = 0389 for _ in range(100):390 if data.draw_bits(8) != 255:391 count += 1392 if count >= 10:393 return394 data.mark_interesting()395 shrinker.shrink()396 assert shrinker.calls <= 1 + 2 * shrinker.max_stall397def test_will_let_fixate_shrink_passes_do_a_full_run_through():398 @shrinking_from(range(50))399 def shrinker(data):400 for i in range(50):401 if data.draw_bits(8) != i:402 data.mark_invalid()403 data.mark_interesting()404 shrinker.max_stall = 5405 passes = [block_program("X" * i) for i in range(1, 11)]406 with pytest.raises(StopShrinking):407 shrinker.fixate_shrink_passes(passes)408 assert shrinker.shrink_pass(passes[-1]).calls > 0409@pytest.mark.parametrize("n_gap", [0, 1, 2, 3])410def test_can_simultaneously_lower_non_duplicated_nearby_blocks(n_gap):411 @shrinking_from([1, 1] + [0] * n_gap + [0, 2])412 def shrinker(data):413 # Block off lowering the whole buffer414 if data.draw_bits(1) == 0:415 data.mark_invalid()416 m = data.draw_bits(8)417 for _ in range(n_gap):418 data.draw_bits(8)419 n = data.draw_bits(16)420 if n == m + 1:421 data.mark_interesting()422 shrinker.fixate_shrink_passes(["lower_blocks_together"])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful