Best Python code snippet using hypothesis
engine.py
Source:engine.py  
...1057            not any(self.shrink_target.buffer) or1058            self.incorporate_new_buffer(hbytes(len(self.shrink_target.buffer)))1059        ):1060            return1061        self.greedy_shrink()1062        self.escape_local_minimum()1063    def greedy_shrink(self):1064        """Run a full set of greedy shrinks (that is, ones that will only ever1065        move to a better target) and update shrink_target appropriately.1066        This method iterates to a fixed point and so is idempontent - calling1067        it twice will have exactly the same effect as calling it once.1068        """1069        # This will automatically be run during the normal loop, but it's worth1070        # running once before the coarse passes so they don't spend time on1071        # useless data.1072        self.remove_discarded()1073        # We only run this once because delta-debugging is essentially useless1074        # and often actively harmful when run on examples which are "close to1075        # shrunk" - the attempts to delete long runs mostly just don't work,1076        # and we spend a huge amount of time trying them anyway. The1077        # greedy_interval_deletion pass will achieve the same end result but1078        # is much cheaper when run on close to shrunk examples.1079        self.delta_interval_deletion()1080        run_expensive_shrinks = False1081        prev = None1082        while prev is not self.shrink_target:1083            prev = self.shrink_target1084            self.remove_discarded()1085            self.zero_intervals()1086            self.minimize_duplicated_blocks()1087            self.minimize_individual_blocks()1088            self.reorder_blocks()1089            self.greedy_interval_deletion()1090            self.lower_dependent_block_pairs()1091            # Passes after this point are expensive: Prior to here they should1092            # all involve no more than about n log(n) shrinks, but after here1093            # they may be quadratic or worse. Running all of the passes until1094            # they make no changes is important for correctness, but nothing1095            # says we have to run all of them on each run! So if the fast1096            # passes still seem to be making useful changes, we restart the1097            # loop here and give them another go.1098            # To avoid the case where the expensive shrinks unlock a trivial1099            # change in one of the previous passes causing this to become much1100            # more expensive by doubling the number of times we have to run1101            # them to get to run the expensive passes again, we make this1102            # decision "sticky" - once it's been useful to run the expensive1103            # changes at least once, we always run them.1104            if prev is self.shrink_target:1105                run_expensive_shrinks = True1106            if not run_expensive_shrinks:1107                continue1108            self.interval_deletion_with_block_lowering()1109            self.pass_to_interval()1110            self.reorder_bytes()1111    def zero_intervals(self):1112        """Attempt to replace each interval with its minimal possible value.1113        This is intended as a fast-track to minimize whole sub-examples that1114        don't matter as rapidly as possible. For example, suppose we had1115        something like the following:1116        ls = data.draw(st.lists(st.lists(st.integers())))1117        assert len(ls) >= 101118        Then each of the elements of ls need to be minimized, and we can do1119        that by deleting individual values from them, but we'd much rather do1120        it fast rather than slow - deleting elements one at a time takes1121        sum(map(len, ls)) shrinks, and ideally we'd do this in len(ls) shrinks1122        as we try to replace each element with [].1123        This pass does that by identifying the size of the "natural smallest"1124        element here. It first tries replacing an entire interval with zero.1125        This will sometimes work (e.g. when the interval is a block), but often1126        what will happen is that there will be leftover zeros that spill over1127        into the next example and ruin things - e.g. here if ls[0] is non-empty1128        and we replace it with all zero, some of the extra zeros will be1129        interpreted as terminating ls and will shrink it down to a one element1130        list, causing the test to pass.1131        So what we do instead is that once we've evaluated that shrink, we use1132        the size of the intervals there to find other possible sizes that we1133        could try replacing the interval with. In this case we'd spot that1134        there is a one-byte interval starting at right place for ls[i] and try1135        to replace it with that. This will successfully replace ls[i] with []1136        as desired.1137        """1138        i = 01139        while i < len(self.shrink_target.intervals):1140            u, v = self.shrink_target.intervals[i]1141            buf = self.shrink_target.buffer1142            prev = self.shrink_target1143            if any(buf[u:v]):1144                attempt = self.cached_test_function(1145                    buf[:u] + hbytes(v - u) + buf[v:]1146                )1147                if (1148                    attempt.status == Status.VALID and1149                    len(attempt.buffer) < len(self.shrink_target.buffer)1150                ):1151                    ends = [s for r, s in attempt.intervals if r == u]1152                    ends.reverse()1153                    for s in ends:1154                        if s < v and self.incorporate_new_buffer(1155                            buf[:u] + hbytes(s - u) + buf[v:]1156                        ):1157                            break1158            if self.shrink_target is prev:1159                i += 11160    def pass_to_interval(self):1161        """Attempt to replace each interval with a subinterval.1162        This is designed to deal with strategies that call themselves1163        recursively. For example, suppose we had:1164        binary_tree = st.deferred(1165            lambda: st.one_of(1166                st.integers(), st.tuples(binary_tree, binary_tree)))1167        This pass guarantees that we can replace any binary tree with one of1168        its subtrees - each of those will create an interval that the parent1169        could validly be replaced with, and this pass will try doing that.1170        This is pretty expensive - it takes O(len(intervals)^2) - so we run it1171        late in the process when we've got the number of intervals as far down1172        as possible.1173        """1174        i = 01175        while i < len(self.shrink_target.intervals):1176            u, v = self.shrink_target.intervals[i]1177            for r, s in reversed(self.shrink_target.intervals):1178                if u <= r <= s <= v and s - r < v - u:1179                    buf = self.shrink_target.buffer1180                    if self.incorporate_new_buffer(1181                        buf[:u] + buf[r:s] + buf[v:]1182                    ):1183                        break1184            else:1185                i += 11186    def escape_local_minimum(self):1187        """Attempt to restart the shrink process from a larger initial value in1188        a way that allows us to escape a local minimum that the main greedy1189        shrink process will get stuck in.1190        The idea is that when we've completed the shrink process, we try1191        starting it again from something reasonably near to the shrunk example1192        that is likely to exhibit the same behaviour.1193        We search for an example that is selected randomly among ones that are1194        "structurally similar" to the original. If we don't find one we bail1195        out fairly quickly as this will usually not work. If we do, we restart1196        the shrink process from there. If this results in us finding a better1197        final example, we do this again until it stops working.1198        This is especially useful for things where the tendency to move1199        complexity to the right works against us - often a generic instance of1200        the problem is easy to shrink, but trying to reduce the size of a1201        minimized example further is hard. For example suppose we had something1202        like:1203        x = data.draw(lists(integers()))1204        y = data.draw(lists(integers(), min_size=len(x), max_size=len(x)))1205        assert not (any(x) and any(y))1206        Then this could shrink to something like [0, 1], [0, 1].1207        Attempting to shrink this further by deleting an element of x would1208        result in losing the last element of y, and the test would start1209        passing. But if we were to replace this with [a, b], [c, d] with c != 01210        then deleting a or b would work.1211        """1212        count = 01213        while count < 10:1214            count += 11215            self.debug('Retrying from random restart')1216            attempt_buf = bytearray(self.shrink_target.buffer)1217            # We use the shrinking information to identify the1218            # structural locations in the byte stream - if lowering1219            # the block would result in changing the size of the1220            # example, changing it here is too likely to break whatever1221            # it was caused the behaviour we're trying to shrink.1222            # Everything non-structural, we redraw uniformly at random.1223            for i, (u, v) in enumerate(self.shrink_target.blocks):1224                if i not in self.shrink_target.shrinking_blocks:1225                    attempt_buf[u:v] = uniform(self.__engine.random, v - u)1226            attempt = self.cached_test_function(attempt_buf)1227            if self.__predicate(attempt):1228                prev = self.shrink_target1229                self.shrink_target = attempt1230                self.greedy_shrink()1231                if (1232                    sort_key(self.shrink_target.buffer) <1233                    sort_key(prev.buffer)1234                ):1235                    # We have successfully shrunk the example past where1236                    # we started from. Now we begin the whole processs1237                    # again from the new, smaller, example.1238                    count = 01239                else:1240                    self.shrink_target = prev1241    def try_shrinking_blocks(self, blocks, b):1242        """Attempts to replace each block in the blocks list with b. Returns1243        True if it succeeded (which may include some additional modifications1244        to shrink_target). If it fails, it may update...fab_phmm.py
Source:fab_phmm.py  
...109                    shrink=True)110    while True:111        model.fit(xseqs, yseqs, max_iter=max_iter, verbose=verbose, verbose_level=verbose_level, n_threads=n_threads)112        models.append(copy.deepcopy(model))113        if not model.greedy_shrink(xseqs, yseqs):114            break115    if sorted:116        models.sort(key=lambda m: - m._last_score)117    return models118class FABPHMM(PHMM):119    def __init__(self, n_match_states=1, n_xins_states=2, n_yins_states=2,120                 n_simbols=4, initprob=None, transprob=None, emitprob=None,121                 symmetric_emission=False, shrink_threshold=1e-2,122                 stop_threshold=1e-5, link_hstates=False, shrink=True,123                 propdim_count_nonzero=True, link_match=True):124        super(FABPHMM, self).__init__(n_match_states=n_match_states,125                                      n_xins_states=n_xins_states,126                                      n_yins_states=n_yins_states,127                                      n_simbols=n_simbols,128                                      initprob=initprob,129                                      transprob=transprob,130                                      emitprob=emitprob,131                                      stop_threshold=stop_threshold,132                                      link_hstates=link_hstates,133                                      link_match=link_match)134        # implement symmetric one for the first program135        self._symmetric_emission = symmetric_emission136        self._shrink_threshold = shrink_threshold137        self._shrink = shrink138        self._propdim_count_nonzero = propdim_count_nonzero139        self._qsum_emit = None140        self._qsum_trans = None141        self._done_shrink = None142        self._last_shrinked = False143        self.monitor = None144        if symmetric_emission:145            raise NotImplementedError("Symmetric emission is not implemented")146            # not implemented147            #self._dim_match_emit = n_simbols * (n_simbols + 1) // 2 - 1148            #self._dim_ins_emit = n_simbols // 2 # /?? how can I deal with it??149            # it is wired because emission parameter demension of insertion and deletion150            # should be symmetric but in this case we have to compute dimenson separately151        else:152            self._dim_match_emit = n_simbols ** 2 - 1153            self._dim_ins_emit = n_simbols - 1154    def _gen_dims(self):155        # assume symmetric output prob disribution156        n_hstates = self._n_hstates157        dim_emit = np.zeros(n_hstates)158        dim_trans = np.sum(self._transprob != 0, axis=1) - 1159        dim_init = np.sum(self._initprob != 0) - 1160        for k in range(n_hstates):161            hstate_prop = self._hstate_properties[k]162            dim_emit[k] = self._dim_match_emit if hstate_prop == 0 else self._dim_ins_emit163        return dim_init, dim_trans, dim_emit164    def _gen_logdelta(self, log_emitprob_frame, dims_trans, dims_emit):165        term_trans = - dims_trans / (2 * self._qsum_trans + EPS)166        term_emit = - dims_emit / (2 * self._qsum_emit + EPS)167        logdelta_1 = term_trans + term_emit168        logdelta_2 = term_emit169        xshape, yshape, n_hstates = log_emitprob_frame.shape170        pseudo_prob = np.full((xshape, yshape, n_hstates), logdelta_1)171        pseudo_prob[-1, -1, :] = logdelta_2172        return pseudo_prob173    def _get_state_to_delete(self):174        if self._n_match_states == 1 and self._n_xins_states == 1 and self._n_yins_states == 1:175            return -1, -1176        dim_init, dims_trans, dims_emit = self._gen_dims()177        qscore = self._qsum_emit / (dims_trans + dims_emit)178        qscore /= np.sum(qscore)179        sep_mx = self._n_match_states180        sep_xy = self._n_match_states + self._n_xins_states181        qscore_match = qscore[:sep_mx]182        qscore_xins = qscore[sep_mx:sep_xy]183        qscore_yins = qscore[sep_xy:]184        def min_val_index(qsum, origin):185            argmin = np.argmin(qsum)186            min_val = qsum[argmin]187            min_index = argmin + origin188            return min_val, min_index189        cands = []190        if self._n_match_states > 1:191            cands.append(min_val_index(qscore_match, 0))192        if self._n_xins_states > 1:193            cands.append(min_val_index(qscore_xins, sep_mx))194        if self._n_yins_states > 1:195            cands.append(min_val_index(qscore_yins, sep_xy))196        val, state = min(cands, key=lambda k: k[0])197        return val, state198    def _shrinkage_operation(self, force=False):199        val, state = self._get_state_to_delete()200        if state == -1:201            return 0202        if force:203            self._delete_state(state)204            return 1205        n_deleted = 0206        while val < self._shrink_threshold:207            self._delete_state(state)208            n_deleted += 1209            val, state = self._get_state_to_delete()210            if state == -1:211                break212        return n_deleted213    def _init_sufficient_statistics(self):214        sstats = super(FABPHMM, self)._init_sufficient_statistics()215        sstats["qsum_emit"] = np.zeros(self._n_hstates)216        sstats["qsum_trans"] = np.zeros(self._n_hstates)217        return sstats218    def score(self, xseqs, yseqs, type="fic"):219        if type == "fic":220            raise NotImplemented("type fic is not implemented")221        elif type == "ll":222            return super(FABPHMM, self).score(xseqs, yseqs)223        else:224            raise ValueError("invalid type")225    def _accumulate_sufficient_statistics(self, sstats, free_energy, gamma, xi, xseq, yseq):226        # free energy is accumulated as sstats["score"]227        super(FABPHMM, self)._accumulate_sufficient_statistics(sstats, free_energy, gamma, xi, xseq, yseq)228        sum_gamma = np.sum(gamma, axis=(0, 1))229        sstats["qsum_emit"] += sum_gamma230        sstats["qsum_trans"] += sum_gamma - gamma[-1, -1, :]231    def _gen_random_qsum(self, xseqs, yseqs):232        n_seqs = len(xseqs)233        qsum_emit = np.zeros(self._n_hstates)234        for i in range(n_seqs):235            xlen, ylen = xseqs[i].shape[0], yseqs[i].shape[0]236            frame = np.random.rand(xlen + 1, ylen + 1, self._n_hstates)237            frame /= frame.sum(axis=2)[:, :, np.newaxis]238            qsum_emit += np.sum(frame, axis=(0,1))239            qsum_trans = qsum_emit - frame[-1, -1, :]240        return qsum_emit, qsum_trans241    def _update_params(self, sstats, fic):242        super(FABPHMM, self)._update_params(sstats)243        self._qsum_emit = sstats["qsum_emit"]244        self._qsum_trans = sstats["qsum_trans"]245        self._last_score = fic246        self._check_probs()247    def _calculate_fic(self, sstats, n_seq):248        dim_init, dims_trans, dims_emit = self._gen_dims()249        fic = sstats["score"] \250              - dim_init / 2 * np.log(n_seq) \251              - np.sum(dims_trans / 2 * (np.log(sstats["qsum_trans"]) - 1)) \252              - np.sum(dims_emit / 2 * (np.log(sstats["qsum_emit"]) - 1))253        return fic254    def greedy_shrink(self):255        return self._shrinkage_operation(force=True)256    def _delete_state(self, deleted_state):257        preserved_hstates = np.delete(np.arange(self._n_hstates), deleted_state)258        deleted_hstateprop = self._hstate_properties[deleted_state]259        if deleted_hstateprop == 0:260            self._n_match_states -= 1261        elif deleted_hstateprop == 1:262            self._n_xins_states -= 1263        elif deleted_hstateprop == 2:264            self._n_yins_states -= 1265        else:266            raise ValueError("invalid hstate prop")267        assert(self._n_match_states > 0)268        assert(self._n_xins_states > 0)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
