How to use greedy_shrink method in hypothesis

Best Python code snippet using hypothesis

engine.py

Source:engine.py Github

copy

Full Screen

...1057 not any(self.shrink_target.buffer) or1058 self.incorporate_new_buffer(hbytes(len(self.shrink_target.buffer)))1059 ):1060 return1061 self.greedy_shrink()1062 self.escape_local_minimum()1063 def greedy_shrink(self):1064 """Run a full set of greedy shrinks (that is, ones that will only ever1065 move to a better target) and update shrink_target appropriately.1066 This method iterates to a fixed point and so is idempontent - calling1067 it twice will have exactly the same effect as calling it once.1068 """1069 # This will automatically be run during the normal loop, but it's worth1070 # running once before the coarse passes so they don't spend time on1071 # useless data.1072 self.remove_discarded()1073 # We only run this once because delta-debugging is essentially useless1074 # and often actively harmful when run on examples which are "close to1075 # shrunk" - the attempts to delete long runs mostly just don't work,1076 # and we spend a huge amount of time trying them anyway. The1077 # greedy_interval_deletion pass will achieve the same end result but1078 # is much cheaper when run on close to shrunk examples.1079 self.delta_interval_deletion()1080 run_expensive_shrinks = False1081 prev = None1082 while prev is not self.shrink_target:1083 prev = self.shrink_target1084 self.remove_discarded()1085 self.zero_intervals()1086 self.minimize_duplicated_blocks()1087 self.minimize_individual_blocks()1088 self.reorder_blocks()1089 self.greedy_interval_deletion()1090 self.lower_dependent_block_pairs()1091 # Passes after this point are expensive: Prior to here they should1092 # all involve no more than about n log(n) shrinks, but after here1093 # they may be quadratic or worse. Running all of the passes until1094 # they make no changes is important for correctness, but nothing1095 # says we have to run all of them on each run! So if the fast1096 # passes still seem to be making useful changes, we restart the1097 # loop here and give them another go.1098 # To avoid the case where the expensive shrinks unlock a trivial1099 # change in one of the previous passes causing this to become much1100 # more expensive by doubling the number of times we have to run1101 # them to get to run the expensive passes again, we make this1102 # decision "sticky" - once it's been useful to run the expensive1103 # changes at least once, we always run them.1104 if prev is self.shrink_target:1105 run_expensive_shrinks = True1106 if not run_expensive_shrinks:1107 continue1108 self.interval_deletion_with_block_lowering()1109 self.pass_to_interval()1110 self.reorder_bytes()1111 def zero_intervals(self):1112 """Attempt to replace each interval with its minimal possible value.1113 This is intended as a fast-track to minimize whole sub-examples that1114 don't matter as rapidly as possible. For example, suppose we had1115 something like the following:1116 ls = data.draw(st.lists(st.lists(st.integers())))1117 assert len(ls) >= 101118 Then each of the elements of ls need to be minimized, and we can do1119 that by deleting individual values from them, but we'd much rather do1120 it fast rather than slow - deleting elements one at a time takes1121 sum(map(len, ls)) shrinks, and ideally we'd do this in len(ls) shrinks1122 as we try to replace each element with [].1123 This pass does that by identifying the size of the "natural smallest"1124 element here. It first tries replacing an entire interval with zero.1125 This will sometimes work (e.g. when the interval is a block), but often1126 what will happen is that there will be leftover zeros that spill over1127 into the next example and ruin things - e.g. here if ls[0] is non-empty1128 and we replace it with all zero, some of the extra zeros will be1129 interpreted as terminating ls and will shrink it down to a one element1130 list, causing the test to pass.1131 So what we do instead is that once we've evaluated that shrink, we use1132 the size of the intervals there to find other possible sizes that we1133 could try replacing the interval with. In this case we'd spot that1134 there is a one-byte interval starting at right place for ls[i] and try1135 to replace it with that. This will successfully replace ls[i] with []1136 as desired.1137 """1138 i = 01139 while i < len(self.shrink_target.intervals):1140 u, v = self.shrink_target.intervals[i]1141 buf = self.shrink_target.buffer1142 prev = self.shrink_target1143 if any(buf[u:v]):1144 attempt = self.cached_test_function(1145 buf[:u] + hbytes(v - u) + buf[v:]1146 )1147 if (1148 attempt.status == Status.VALID and1149 len(attempt.buffer) < len(self.shrink_target.buffer)1150 ):1151 ends = [s for r, s in attempt.intervals if r == u]1152 ends.reverse()1153 for s in ends:1154 if s < v and self.incorporate_new_buffer(1155 buf[:u] + hbytes(s - u) + buf[v:]1156 ):1157 break1158 if self.shrink_target is prev:1159 i += 11160 def pass_to_interval(self):1161 """Attempt to replace each interval with a subinterval.1162 This is designed to deal with strategies that call themselves1163 recursively. For example, suppose we had:1164 binary_tree = st.deferred(1165 lambda: st.one_of(1166 st.integers(), st.tuples(binary_tree, binary_tree)))1167 This pass guarantees that we can replace any binary tree with one of1168 its subtrees - each of those will create an interval that the parent1169 could validly be replaced with, and this pass will try doing that.1170 This is pretty expensive - it takes O(len(intervals)^2) - so we run it1171 late in the process when we've got the number of intervals as far down1172 as possible.1173 """1174 i = 01175 while i < len(self.shrink_target.intervals):1176 u, v = self.shrink_target.intervals[i]1177 for r, s in reversed(self.shrink_target.intervals):1178 if u <= r <= s <= v and s - r < v - u:1179 buf = self.shrink_target.buffer1180 if self.incorporate_new_buffer(1181 buf[:u] + buf[r:s] + buf[v:]1182 ):1183 break1184 else:1185 i += 11186 def escape_local_minimum(self):1187 """Attempt to restart the shrink process from a larger initial value in1188 a way that allows us to escape a local minimum that the main greedy1189 shrink process will get stuck in.1190 The idea is that when we've completed the shrink process, we try1191 starting it again from something reasonably near to the shrunk example1192 that is likely to exhibit the same behaviour.1193 We search for an example that is selected randomly among ones that are1194 "structurally similar" to the original. If we don't find one we bail1195 out fairly quickly as this will usually not work. If we do, we restart1196 the shrink process from there. If this results in us finding a better1197 final example, we do this again until it stops working.1198 This is especially useful for things where the tendency to move1199 complexity to the right works against us - often a generic instance of1200 the problem is easy to shrink, but trying to reduce the size of a1201 minimized example further is hard. For example suppose we had something1202 like:1203 x = data.draw(lists(integers()))1204 y = data.draw(lists(integers(), min_size=len(x), max_size=len(x)))1205 assert not (any(x) and any(y))1206 Then this could shrink to something like [0, 1], [0, 1].1207 Attempting to shrink this further by deleting an element of x would1208 result in losing the last element of y, and the test would start1209 passing. But if we were to replace this with [a, b], [c, d] with c != 01210 then deleting a or b would work.1211 """1212 count = 01213 while count < 10:1214 count += 11215 self.debug('Retrying from random restart')1216 attempt_buf = bytearray(self.shrink_target.buffer)1217 # We use the shrinking information to identify the1218 # structural locations in the byte stream - if lowering1219 # the block would result in changing the size of the1220 # example, changing it here is too likely to break whatever1221 # it was caused the behaviour we're trying to shrink.1222 # Everything non-structural, we redraw uniformly at random.1223 for i, (u, v) in enumerate(self.shrink_target.blocks):1224 if i not in self.shrink_target.shrinking_blocks:1225 attempt_buf[u:v] = uniform(self.__engine.random, v - u)1226 attempt = self.cached_test_function(attempt_buf)1227 if self.__predicate(attempt):1228 prev = self.shrink_target1229 self.shrink_target = attempt1230 self.greedy_shrink()1231 if (1232 sort_key(self.shrink_target.buffer) <1233 sort_key(prev.buffer)1234 ):1235 # We have successfully shrunk the example past where1236 # we started from. Now we begin the whole processs1237 # again from the new, smaller, example.1238 count = 01239 else:1240 self.shrink_target = prev1241 def try_shrinking_blocks(self, blocks, b):1242 """Attempts to replace each block in the blocks list with b. Returns1243 True if it succeeded (which may include some additional modifications1244 to shrink_target). If it fails, it may update...

Full Screen

Full Screen

fab_phmm.py

Source:fab_phmm.py Github

copy

Full Screen

...109 shrink=True)110 while True:111 model.fit(xseqs, yseqs, max_iter=max_iter, verbose=verbose, verbose_level=verbose_level, n_threads=n_threads)112 models.append(copy.deepcopy(model))113 if not model.greedy_shrink(xseqs, yseqs):114 break115 if sorted:116 models.sort(key=lambda m: - m._last_score)117 return models118class FABPHMM(PHMM):119 def __init__(self, n_match_states=1, n_xins_states=2, n_yins_states=2,120 n_simbols=4, initprob=None, transprob=None, emitprob=None,121 symmetric_emission=False, shrink_threshold=1e-2,122 stop_threshold=1e-5, link_hstates=False, shrink=True,123 propdim_count_nonzero=True, link_match=True):124 super(FABPHMM, self).__init__(n_match_states=n_match_states,125 n_xins_states=n_xins_states,126 n_yins_states=n_yins_states,127 n_simbols=n_simbols,128 initprob=initprob,129 transprob=transprob,130 emitprob=emitprob,131 stop_threshold=stop_threshold,132 link_hstates=link_hstates,133 link_match=link_match)134 # implement symmetric one for the first program135 self._symmetric_emission = symmetric_emission136 self._shrink_threshold = shrink_threshold137 self._shrink = shrink138 self._propdim_count_nonzero = propdim_count_nonzero139 self._qsum_emit = None140 self._qsum_trans = None141 self._done_shrink = None142 self._last_shrinked = False143 self.monitor = None144 if symmetric_emission:145 raise NotImplementedError("Symmetric emission is not implemented")146 # not implemented147 #self._dim_match_emit = n_simbols * (n_simbols + 1) // 2 - 1148 #self._dim_ins_emit = n_simbols // 2 # /?? how can I deal with it??149 # it is wired because emission parameter demension of insertion and deletion150 # should be symmetric but in this case we have to compute dimenson separately151 else:152 self._dim_match_emit = n_simbols ** 2 - 1153 self._dim_ins_emit = n_simbols - 1154 def _gen_dims(self):155 # assume symmetric output prob disribution156 n_hstates = self._n_hstates157 dim_emit = np.zeros(n_hstates)158 dim_trans = np.sum(self._transprob != 0, axis=1) - 1159 dim_init = np.sum(self._initprob != 0) - 1160 for k in range(n_hstates):161 hstate_prop = self._hstate_properties[k]162 dim_emit[k] = self._dim_match_emit if hstate_prop == 0 else self._dim_ins_emit163 return dim_init, dim_trans, dim_emit164 def _gen_logdelta(self, log_emitprob_frame, dims_trans, dims_emit):165 term_trans = - dims_trans / (2 * self._qsum_trans + EPS)166 term_emit = - dims_emit / (2 * self._qsum_emit + EPS)167 logdelta_1 = term_trans + term_emit168 logdelta_2 = term_emit169 xshape, yshape, n_hstates = log_emitprob_frame.shape170 pseudo_prob = np.full((xshape, yshape, n_hstates), logdelta_1)171 pseudo_prob[-1, -1, :] = logdelta_2172 return pseudo_prob173 def _get_state_to_delete(self):174 if self._n_match_states == 1 and self._n_xins_states == 1 and self._n_yins_states == 1:175 return -1, -1176 dim_init, dims_trans, dims_emit = self._gen_dims()177 qscore = self._qsum_emit / (dims_trans + dims_emit)178 qscore /= np.sum(qscore)179 sep_mx = self._n_match_states180 sep_xy = self._n_match_states + self._n_xins_states181 qscore_match = qscore[:sep_mx]182 qscore_xins = qscore[sep_mx:sep_xy]183 qscore_yins = qscore[sep_xy:]184 def min_val_index(qsum, origin):185 argmin = np.argmin(qsum)186 min_val = qsum[argmin]187 min_index = argmin + origin188 return min_val, min_index189 cands = []190 if self._n_match_states > 1:191 cands.append(min_val_index(qscore_match, 0))192 if self._n_xins_states > 1:193 cands.append(min_val_index(qscore_xins, sep_mx))194 if self._n_yins_states > 1:195 cands.append(min_val_index(qscore_yins, sep_xy))196 val, state = min(cands, key=lambda k: k[0])197 return val, state198 def _shrinkage_operation(self, force=False):199 val, state = self._get_state_to_delete()200 if state == -1:201 return 0202 if force:203 self._delete_state(state)204 return 1205 n_deleted = 0206 while val < self._shrink_threshold:207 self._delete_state(state)208 n_deleted += 1209 val, state = self._get_state_to_delete()210 if state == -1:211 break212 return n_deleted213 def _init_sufficient_statistics(self):214 sstats = super(FABPHMM, self)._init_sufficient_statistics()215 sstats["qsum_emit"] = np.zeros(self._n_hstates)216 sstats["qsum_trans"] = np.zeros(self._n_hstates)217 return sstats218 def score(self, xseqs, yseqs, type="fic"):219 if type == "fic":220 raise NotImplemented("type fic is not implemented")221 elif type == "ll":222 return super(FABPHMM, self).score(xseqs, yseqs)223 else:224 raise ValueError("invalid type")225 def _accumulate_sufficient_statistics(self, sstats, free_energy, gamma, xi, xseq, yseq):226 # free energy is accumulated as sstats["score"]227 super(FABPHMM, self)._accumulate_sufficient_statistics(sstats, free_energy, gamma, xi, xseq, yseq)228 sum_gamma = np.sum(gamma, axis=(0, 1))229 sstats["qsum_emit"] += sum_gamma230 sstats["qsum_trans"] += sum_gamma - gamma[-1, -1, :]231 def _gen_random_qsum(self, xseqs, yseqs):232 n_seqs = len(xseqs)233 qsum_emit = np.zeros(self._n_hstates)234 for i in range(n_seqs):235 xlen, ylen = xseqs[i].shape[0], yseqs[i].shape[0]236 frame = np.random.rand(xlen + 1, ylen + 1, self._n_hstates)237 frame /= frame.sum(axis=2)[:, :, np.newaxis]238 qsum_emit += np.sum(frame, axis=(0,1))239 qsum_trans = qsum_emit - frame[-1, -1, :]240 return qsum_emit, qsum_trans241 def _update_params(self, sstats, fic):242 super(FABPHMM, self)._update_params(sstats)243 self._qsum_emit = sstats["qsum_emit"]244 self._qsum_trans = sstats["qsum_trans"]245 self._last_score = fic246 self._check_probs()247 def _calculate_fic(self, sstats, n_seq):248 dim_init, dims_trans, dims_emit = self._gen_dims()249 fic = sstats["score"] \250 - dim_init / 2 * np.log(n_seq) \251 - np.sum(dims_trans / 2 * (np.log(sstats["qsum_trans"]) - 1)) \252 - np.sum(dims_emit / 2 * (np.log(sstats["qsum_emit"]) - 1))253 return fic254 def greedy_shrink(self):255 return self._shrinkage_operation(force=True)256 def _delete_state(self, deleted_state):257 preserved_hstates = np.delete(np.arange(self._n_hstates), deleted_state)258 deleted_hstateprop = self._hstate_properties[deleted_state]259 if deleted_hstateprop == 0:260 self._n_match_states -= 1261 elif deleted_hstateprop == 1:262 self._n_xins_states -= 1263 elif deleted_hstateprop == 2:264 self._n_yins_states -= 1265 else:266 raise ValueError("invalid hstate prop")267 assert(self._n_match_states > 0)268 assert(self._n_xins_states > 0)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run hypothesis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful