How to use update_shrink_target method in hypothesis

Best Python code snippet using hypothesis

shrinker.py

Source:shrinker.py Github

copy

Full Screen

...232 self.initial_size = len(initial.buffer)233 # We keep track of the current best example on the shrink_target234 # attribute.235 self.shrink_target = None236 self.update_shrink_target(initial)237 self.shrinks = 0238 # We terminate shrinks that seem to have reached their logical239 # conclusion: If we've called the underlying test function at240 # least self.max_stall times since the last time we shrunk,241 # it's time to stop shrinking.242 self.max_stall = 200243 self.initial_calls = self.engine.call_count244 self.calls_at_last_shrink = self.initial_calls245 self.passes_by_name = {}246 self.passes = []247 # Extra DFAs that may be installed. This is used solely for248 # testing and learning purposes.249 self.extra_dfas = {}250 @derived_value251 def cached_calculations(self):252 return {}253 def cached(self, *keys):254 def accept(f):255 cache_key = (f.__name__,) + keys256 try:257 return self.cached_calculations[cache_key]258 except KeyError:259 return self.cached_calculations.setdefault(cache_key, f())260 return accept261 def add_new_pass(self, run):262 """Creates a shrink pass corresponding to calling ``run(self)``"""263 definition = SHRINK_PASS_DEFINITIONS[run]264 p = ShrinkPass(265 run_with_chooser=definition.run_with_chooser,266 shrinker=self,267 index=len(self.passes),268 )269 self.passes.append(p)270 self.passes_by_name[p.name] = p271 return p272 def shrink_pass(self, name):273 """Return the ShrinkPass object for the pass with the given name."""274 if isinstance(name, ShrinkPass):275 return name276 if name not in self.passes_by_name:277 self.add_new_pass(name)278 return self.passes_by_name[name]279 @derived_value280 def match_cache(self):281 return {}282 def matching_regions(self, dfa):283 """Returns all pairs (u, v) such that self.buffer[u:v] is accepted284 by this DFA."""285 try:286 return self.match_cache[dfa]287 except KeyError:288 pass289 results = dfa.all_matching_regions(self.buffer)290 results.sort(key=lambda t: (t[1] - t[0], t[1]))291 assert all(dfa.matches(self.buffer[u:v]) for u, v in results)292 self.match_cache[dfa] = results293 return results294 @property295 def calls(self):296 """Return the number of calls that have been made to the underlying297 test function."""298 return self.engine.call_count299 def consider_new_buffer(self, buffer):300 """Returns True if after running this buffer the result would be301 the current shrink_target."""302 buffer = bytes(buffer)303 return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)304 def incorporate_new_buffer(self, buffer):305 """Either runs the test function on this buffer and returns True if306 that changed the shrink_target, or determines that doing so would307 be useless and returns False without running it."""308 buffer = bytes(buffer[: self.shrink_target.index])309 # Sometimes an attempt at lexicographic minimization will do the wrong310 # thing because the buffer has changed under it (e.g. something has311 # turned into a write, the bit size has changed). The result would be312 # an invalid string, but it's better for us to just ignore it here as313 # it turns out to involve quite a lot of tricky book-keeping to get314 # this right and it's better to just handle it in one place.315 if sort_key(buffer) >= sort_key(self.shrink_target.buffer):316 return False317 if self.shrink_target.buffer.startswith(buffer):318 return False319 previous = self.shrink_target320 self.cached_test_function(buffer)321 return previous is not self.shrink_target322 def incorporate_test_data(self, data):323 """Takes a ConjectureData or Overrun object updates the current324 shrink_target if this data represents an improvement over it."""325 if data.status < Status.VALID or data is self.shrink_target:326 return327 if (328 self.__predicate(data)329 and sort_key(data.buffer) < sort_key(self.shrink_target.buffer)330 and self.__allow_transition(self.shrink_target, data)331 ):332 self.update_shrink_target(data)333 def cached_test_function(self, buffer):334 """Returns a cached version of the underlying test function, so335 that the result is either an Overrun object (if the buffer is336 too short to be a valid test case) or a ConjectureData object337 with status >= INVALID that would result from running this buffer."""338 buffer = bytes(buffer)339 result = self.engine.cached_test_function(buffer)340 self.incorporate_test_data(result)341 if self.calls - self.calls_at_last_shrink >= self.max_stall:342 raise StopShrinking()343 return result344 def debug(self, msg):345 self.engine.debug(msg)346 @property347 def random(self):348 return self.engine.random349 def shrink(self):350 """Run the full set of shrinks and update shrink_target.351 This method is "mostly idempotent" - calling it twice is unlikely to352 have any effect, though it has a non-zero probability of doing so.353 """354 # We assume that if an all-zero block of bytes is an interesting355 # example then we're not going to do better than that.356 # This might not technically be true: e.g. for integers() | booleans()357 # the simplest example is actually [1, 0]. Missing this case is fairly358 # harmless and this allows us to make various simplifying assumptions359 # about the structure of the data (principally that we're never360 # operating on a block of all zero bytes so can use non-zeroness as a361 # signpost of complexity).362 if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(363 bytes(len(self.shrink_target.buffer))364 ):365 return366 try:367 self.greedy_shrink()368 except StopShrinking:369 pass370 finally:371 if self.engine.report_debug_info:372 def s(n):373 return "s" if n != 1 else ""374 total_deleted = self.initial_size - len(self.shrink_target.buffer)375 self.debug("---------------------")376 self.debug("Shrink pass profiling")377 self.debug("---------------------")378 self.debug("")379 calls = self.engine.call_count - self.initial_calls380 self.debug(381 (382 "Shrinking made a total of %d call%s "383 "of which %d shrank. This deleted %d byte%s out of %d."384 )385 % (386 calls,387 s(calls),388 self.shrinks,389 total_deleted,390 s(total_deleted),391 self.initial_size,392 )393 )394 for useful in [True, False]:395 self.debug("")396 if useful:397 self.debug("Useful passes:")398 else:399 self.debug("Useless passes:")400 self.debug("")401 for p in sorted(402 self.passes, key=lambda t: (-t.calls, t.deletions, t.shrinks)403 ):404 if p.calls == 0:405 continue406 if (p.shrinks != 0) != useful:407 continue408 self.debug(409 (410 " * %s made %d call%s of which "411 "%d shrank, deleting %d byte%s."412 )413 % (414 p.name,415 p.calls,416 s(p.calls),417 p.shrinks,418 p.deletions,419 s(p.deletions),420 )421 )422 self.debug("")423 def greedy_shrink(self):424 """Run a full set of greedy shrinks (that is, ones that will only ever425 move to a better target) and update shrink_target appropriately.426 This method iterates to a fixed point and so is idempontent - calling427 it twice will have exactly the same effect as calling it once.428 """429 self.fixate_shrink_passes(430 [431 block_program("X" * 5),432 block_program("X" * 4),433 block_program("X" * 3),434 block_program("X" * 2),435 block_program("X" * 1),436 "pass_to_descendant",437 "reorder_examples",438 "minimize_floats",439 "minimize_duplicated_blocks",440 block_program("-XX"),441 "minimize_individual_blocks",442 block_program("--X"),443 "redistribute_block_pairs",444 "lower_blocks_together",445 ]446 + [dfa_replacement(n) for n in SHRINKING_DFAS]447 )448 @derived_value449 def shrink_pass_choice_trees(self):450 return defaultdict(ChoiceTree)451 def fixate_shrink_passes(self, passes):452 """Run steps from each pass in ``passes`` until the current shrink target453 is a fixed point of all of them."""454 passes = list(map(self.shrink_pass, passes))455 any_ran = True456 while any_ran:457 any_ran = False458 reordering = {}459 # We run remove_discarded after every pass to do cleanup460 # keeping track of whether that actually works. Either there is461 # no discarded data and it is basically free, or it reliably works462 # and deletes data, or it doesn't work. In that latter case we turn463 # it off for the rest of this loop through the passes, but will464 # try again once all of the passes have been run.465 can_discard = self.remove_discarded()466 calls_at_loop_start = self.calls467 # We keep track of how many calls can be made by a single step468 # without making progress and use this to test how much to pad469 # out self.max_stall by as we go along.470 max_calls_per_failing_step = 1471 for sp in passes:472 if can_discard:473 can_discard = self.remove_discarded()474 before_sp = self.shrink_target475 # Run the shrink pass until it fails to make any progress476 # max_failures times in a row. This implicitly boosts shrink477 # passes that are more likely to work.478 failures = 0479 max_failures = 20480 while failures < max_failures:481 # We don't allow more than max_stall consecutive failures482 # to shrink, but this means that if we're unlucky and the483 # shrink passes are in a bad order where only the ones at484 # the end are useful, if we're not careful this heuristic485 # might stop us before we've tried everything. In order to486 # avoid that happening, we make sure that there's always487 # plenty of breathing room to make it through a single488 # iteration of the fixate_shrink_passes loop.489 self.max_stall = max(490 self.max_stall,491 2 * max_calls_per_failing_step492 + (self.calls - calls_at_loop_start),493 )494 prev = self.shrink_target495 initial_calls = self.calls496 # It's better for us to run shrink passes in a deterministic497 # order, to avoid repeat work, but this can cause us to create498 # long stalls when there are a lot of steps which fail to do499 # anything useful. In order to avoid this, once we've noticed500 # we're in a stall (i.e. half of max_failures calls have failed501 # to do anything) we switch to randomly jumping around. If we502 # find a success then we'll resume deterministic order from503 # there which, with any luck, is in a new good region.504 if not sp.step(random_order=failures >= max_failures // 2):505 # step returns False when there is nothing to do because506 # the entire choice tree is exhausted. If this happens507 # we break because we literally can't run this pass any508 # more than we already have until something else makes509 # progress.510 break511 any_ran = True512 # Don't count steps that didn't actually try to do513 # anything as failures. Otherwise, this call is a failure514 # if it failed to make any changes to the shrink target.515 if initial_calls != self.calls:516 if prev is not self.shrink_target:517 failures = 0518 else:519 max_calls_per_failing_step = max(520 max_calls_per_failing_step, self.calls - initial_calls521 )522 failures += 1523 # We reorder the shrink passes so that on our next run through524 # we try good ones first. The rule is that shrink passes that525 # did nothing useful are the worst, shrink passes that reduced526 # the length are the best.527 if self.shrink_target is before_sp:528 reordering[sp] = 1529 elif len(self.buffer) < len(before_sp.buffer):530 reordering[sp] = -1531 else:532 reordering[sp] = 0533 passes.sort(key=reordering.__getitem__)534 @property535 def buffer(self):536 return self.shrink_target.buffer537 @property538 def blocks(self):539 return self.shrink_target.blocks540 @property541 def examples(self):542 return self.shrink_target.examples543 def all_block_bounds(self):544 return self.shrink_target.blocks.all_bounds()545 @derived_value546 def examples_by_label(self):547 """An index of all examples grouped by their label, with548 the examples stored in their normal index order."""549 examples_by_label = defaultdict(list)550 for ex in self.examples:551 examples_by_label[ex.label].append(ex)552 return dict(examples_by_label)553 @derived_value554 def distinct_labels(self):555 return sorted(self.examples_by_label, key=str)556 @defines_shrink_pass()557 def pass_to_descendant(self, chooser):558 """Attempt to replace each example with a descendant example.559 This is designed to deal with strategies that call themselves560 recursively. For example, suppose we had:561 binary_tree = st.deferred(562 lambda: st.one_of(563 st.integers(), st.tuples(binary_tree, binary_tree)))564 This pass guarantees that we can replace any binary tree with one of565 its subtrees - each of those will create an interval that the parent566 could validly be replaced with, and this pass will try doing that.567 This is pretty expensive - it takes O(len(intervals)^2) - so we run it568 late in the process when we've got the number of intervals as far down569 as possible.570 """571 label = chooser.choose(572 self.distinct_labels, lambda l: len(self.examples_by_label[l]) >= 2573 )574 ls = self.examples_by_label[label]575 i = chooser.choose(range(len(ls) - 1))576 ancestor = ls[i]577 if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end:578 return579 @self.cached(label, i)580 def descendants():581 lo = i + 1582 hi = len(ls)583 while lo + 1 < hi:584 mid = (lo + hi) // 2585 if ls[mid].start >= ancestor.end:586 hi = mid587 else:588 lo = mid589 return [t for t in ls[i + 1 : hi] if t.length < ancestor.length]590 descendant = chooser.choose(descendants, lambda ex: ex.length > 0)591 assert ancestor.start <= descendant.start592 assert ancestor.end >= descendant.end593 assert descendant.length < ancestor.length594 self.incorporate_new_buffer(595 self.buffer[: ancestor.start]596 + self.buffer[descendant.start : descendant.end]597 + self.buffer[ancestor.end :]598 )599 def lower_common_block_offset(self):600 """Sometimes we find ourselves in a situation where changes to one part601 of the byte stream unlock changes to other parts. Sometimes this is602 good, but sometimes this can cause us to exhibit exponential slow603 downs!604 e.g. suppose we had the following:605 m = draw(integers(min_value=0))606 n = draw(integers(min_value=0))607 assert abs(m - n) > 1608 If this fails then we'll end up with a loop where on each iteration we609 reduce each of m and n by 2 - m can't go lower because of n, then n610 can't go lower because of m.611 This will take us O(m) iterations to complete, which is exponential in612 the data size, as we gradually zig zag our way towards zero.613 This can only happen if we're failing to reduce the size of the byte614 stream: The number of iterations that reduce the length of the byte615 stream is bounded by that length.616 So what we do is this: We keep track of which blocks are changing, and617 then if there's some non-zero common offset to them we try and minimize618 them all at once by lowering that offset.619 This may not work, and it definitely won't get us out of all possible620 exponential slow downs (an example of where it doesn't is where the621 shape of the blocks changes as a result of this bouncing behaviour),622 but it fails fast when it doesn't work and gets us out of a really623 nastily slow case when it does.624 """625 if len(self.__changed_blocks) <= 1:626 return627 current = self.shrink_target628 blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]629 changed = [630 i631 for i in sorted(self.__changed_blocks)632 if not self.shrink_target.blocks[i].trivial633 ]634 if not changed:635 return636 ints = [int_from_bytes(blocked[i]) for i in changed]637 offset = min(ints)638 assert offset > 0639 for i in range(len(ints)):640 ints[i] -= offset641 def reoffset(o):642 new_blocks = list(blocked)643 for i, v in zip(changed, ints):644 new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))645 return self.incorporate_new_buffer(b"".join(new_blocks))646 Integer.shrink(offset, reoffset, random=self.random)647 self.clear_change_tracking()648 def clear_change_tracking(self):649 self.__last_checked_changed_at = self.shrink_target650 self.__all_changed_blocks = set()651 def mark_changed(self, i):652 self.__changed_blocks.add(i)653 @property654 def __changed_blocks(self):655 if self.__last_checked_changed_at is not self.shrink_target:656 prev_target = self.__last_checked_changed_at657 new_target = self.shrink_target658 assert prev_target is not new_target659 prev = prev_target.buffer660 new = new_target.buffer661 assert sort_key(new) < sort_key(prev)662 if (663 len(new_target.blocks) != len(prev_target.blocks)664 or new_target.blocks.endpoints != prev_target.blocks.endpoints665 ):666 self.__all_changed_blocks = set()667 else:668 blocks = new_target.blocks669 # Index of last block whose contents have been modified, found670 # by checking if the tail past this point has been modified.671 last_changed = binary_search(672 0,673 len(blocks),674 lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],675 )676 # Index of the first block whose contents have been changed,677 # because we know that this predicate is true for zero (because678 # the prefix from the start is empty), so the result must be True679 # for the bytes from the start of this block and False for the680 # bytes from the end, hence the change is in this block.681 first_changed = binary_search(682 0,683 len(blocks),684 lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],685 )686 # Between these two changed regions we now do a linear scan to687 # check if any specific block values have changed.688 for i in range(first_changed, last_changed + 1):689 u, v = blocks.bounds(i)690 if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:691 self.__all_changed_blocks.add(i)692 self.__last_checked_changed_at = new_target693 assert self.__last_checked_changed_at is self.shrink_target694 return self.__all_changed_blocks695 def update_shrink_target(self, new_target):696 assert isinstance(new_target, ConjectureResult)697 if self.shrink_target is not None:698 self.shrinks += 1699 # If we are just taking a long time to shrink we don't want to700 # trigger this heuristic, so whenever we shrink successfully701 # we give ourselves a bit of breathing room to make sure we702 # would find a shrink that took that long to find the next time.703 # The case where we're taking a long time but making steady704 # progress is handled by `finish_shrinking_deadline` in engine.py705 self.max_stall = max(706 self.max_stall, (self.calls - self.calls_at_last_shrink) * 2707 )708 self.calls_at_last_shrink = self.calls709 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful