Best Python code snippet using autotest_python
model.py
Source:model.py  
1import numpy as np2import tensorflow as tf3import tqdm as tqdm4from src.metrics.metrics import accurracy5class Model:6    def __init__(self, DataProvider, BodyBuilder,7                 HeadBuilder, PlaceholderBuilder, Monitor, scopes,8                 trainable_scopes, is_training,9                 learning_rate=1e-3, lr_scheduler=None):10        self.DataProvider = DataProvider11        self.BodyBuilder = BodyBuilder12        self.HeadBuilder = HeadBuilder13        self.Monitor = Monitor14        self.scopes = scopes15        self.trainable_scopes = trainable_scopes16        self.init_learning_rate = learning_rate17        self.learning_rate = tf.placeholder(dtype=tf.float32)18        self.placeholder_builder = PlaceholderBuilder19        self.initialized = False20        self.is_training = is_training21        self.lr_scheduler = lr_scheduler22    def set_up(self):23        self.__set_up_placeholders()24        self.__set_up_model()25        self.__set_up_training()26        self.initialized = True27    def __set_up_placeholders(self):28        self.x, self.y, self.priorities, self.weights = self.placeholder_builder.set_up_placeholders()29    def __set_up_model(self):30        self.processed = self.BodyBuilder.get_body(self.x, self.scopes["body"], self.priorities,31                                                   self.weights)32        self.model_loss, self.pred_logits = self.HeadBuilder.get_head(self.processed, self.y, self.scopes["head"])33        self.pred = tf.nn.softmax(self.pred_logits)34        self.reg_loss = tf.losses.get_regularization_loss()35        self.loss = self.model_loss + self.reg_loss36    def __set_up_training(self):37        self.vars = []38        for scope in self.trainable_scopes:39            trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)40            self.vars += trainable_vars41        print(len(self.vars), "TOTAL NUMBER OF PARAMETERS: ",42              np.sum([np.prod(v.get_shape().as_list()) for v in self.vars]))43        self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)44        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)45        with tf.control_dependencies(update_ops):46            self.train_op = self.optimizer.minimize(self.loss, var_list=self.vars)47    def add_to_monitor_loss(self, loss, train_acc, test_loss, test_acc):48        self.Monitor.monitor_all(["train_loss", "train_acc", "valid_loss", "valid_acc"],49                                 [loss, train_acc, test_loss, test_acc])50    def add_to_monitor(self, loss, pred, test_acc, test_loss, test_pred, train_acc):51        self.add_to_monitor_loss(loss,train_acc,test_loss,test_acc)52        self.Monitor.monitor_all(["train_pred", "test_pred"],53                                 [np.argmax(pred, axis=1), np.argmax(test_pred, axis=1)])54    def train(self, session, epochs, batch_size, args, saver, config_name, model_name):55        assert self.initialized, "model must be set up before training"56        self.Monitor.save_args(args, config_name)57        self.DataProvider.load_dataset(args.shuffle, args.random_state)58        for epoch in range(epochs):59            for iter in tqdm.tqdm(range(len(self.DataProvider) // batch_size)):60                train_x, train_y, weights, priorities = self.DataProvider.get_random_batch(batch_size, "train")61                lr = self.lr_scheduler(epoch) if self.lr_scheduler is not None else self.init_learning_rate62                if self.weights is not None and self.priorities is not None:63                    _, loss, pred = session.run([self.train_op, self.loss, self.pred],64                                                feed_dict={self.x: train_x, self.y: train_y, self.weights: weights,65                                                           self.priorities: priorities, self.is_training:True,66                                                           self.learning_rate:lr})67                else:68                    _, loss, pred = session.run([self.train_op, self.loss, self.pred],69                                                feed_dict={self.x: train_x, self.y: train_y, self.is_training:True,70                                                           self.learning_rate: lr71                                                           })72                test_x, test_y, test_weights, test_priorities = self.DataProvider.get_random_batch(batch_size, "valid")73                if self.weights is not None and self.priorities is not None:74                    test_loss, test_pred = session.run([self.loss, self.pred],75                                                       feed_dict={self.x: test_x, self.y: test_y,76                                                                  self.weights: test_weights,77                                                                  self.priorities: test_priorities,78                                                                  self.is_training: False})79                else:80                    test_loss, test_pred = session.run([self.loss, self.pred],81                                                       feed_dict={self.x: test_x,82                                                                  self.y: test_y,83                                                                  self.is_training: False})84                train_acc = accurracy(np.argmax(pred, axis=1), train_y)85                test_acc = accurracy(np.argmax(test_pred, axis=1), test_y)86                self.add_to_monitor(loss, pred, test_acc, test_loss, test_pred, train_acc)87            self.Monitor.save_session(session, saver, model_name)88            self.Monitor.save()89    def train_epoch(self, session, epochs, batch_size, args, saver, config_name, model_name):90        assert self.initialized, "model must be set up before training"91        self.Monitor.save_args(args, config_name)92        self.DataProvider.load_dataset(args.shuffle, args.random_state)93        for epoch in range(epochs):94            lr = self.lr_scheduler(epoch) if self.lr_scheduler is not None else self.init_learning_rate95            Loss, Acc = [], []96            for iter in tqdm.tqdm(range(len(self.DataProvider) // batch_size)):97                train_x, train_y, weights, priorities = self.DataProvider.get_next_batch(batch_size, iter, "train")98                if self.weights is not None and self.priorities is not None:99                    _, loss, pred = session.run([self.train_op, self.loss, self.pred],100                                                feed_dict={self.x: train_x, self.y: train_y, self.weights: weights,101                                                           self.priorities: priorities, self.is_training:True,102                                                           self.learning_rate:lr})103                else:104                    _, loss, pred = session.run([self.train_op, self.loss, self.pred],105                                                feed_dict={self.x: train_x, self.y: train_y, self.is_training:True,106                                                           self.learning_rate: lr})107                train_acc = accurracy(np.argmax(pred, axis=1), train_y)108                self.Monitor.monitor_all(["train_pred"], [np.argmax(pred, axis=1)])109                Loss.append(loss)110                Acc.append(train_acc)111            Loss_valid, Acc_valid = [], []112            for v_iter in range(self.DataProvider.valid_len() // batch_size):113                test_x, test_y, test_weights, test_priorities = self.DataProvider.get_next_batch(batch_size, v_iter, "valid")114                115                if self.weights is not None and self.priorities is not None:116                    test_loss, test_pred = session.run([self.loss, self.pred],117                                                       feed_dict={self.x: test_x, self.y: test_y,118                                                                  self.weights: test_weights,119                                                                  self.priorities: test_priorities,120                                                                  self.is_training:False})121                else:122                    test_loss, test_pred = session.run([self.loss, self.pred],123                                                       feed_dict={self.x: test_x, self.y: test_y,124                                                                  self.is_training:False})125                self.Monitor.monitor_all(["valid_pred"], [np.argmax(test_pred, axis=1)])126                test_acc = accurracy(np.argmax(test_pred, axis=1), test_y)127                Loss_valid.append(test_loss)128                Acc_valid.append(test_acc)129            self.add_to_monitor_loss(np.mean(Loss), np.mean(Acc), np.mean(Loss_valid), np.mean(Acc_valid))130            self.Monitor.save_session(session, saver, model_name)131            self.Monitor.save()132    def predict(self, session, X):133        pred = session.run(self.pred, feed_dict={self.x: X, self.is_training: False})134        return np.argmax(pred, axis=1)135    def predict_dataset(self, session, batch_size, dataset_type, args):136        self.DataProvider.load_dataset(args.shuffle, args.random_state)137        length = self.DataProvider.get_dataset_length(dataset_type)138        for iter in range(length // batch_size):139            batch_x, batch_y, ww, pp = self.DataProvider.get_next_batch( batch_size, iter, dataset_type)140            if self.weights is not None and self.priorities is not None:141               loss, pred = session.run([self.loss, self.pred],142                                     feed_dict={self.x: batch_x, self.y: batch_y, self.weights: ww,143                                                self.priorities: pp, self.is_training: False})144            else:145               loss, pred = session.run([self.loss, self.pred],146                                     feed_dict={self.x:batch_x, self.y:batch_y, self.is_training: False})147           148            acc = accurracy(np.argmax(pred, axis=1), batch_y)149            self.Monitor.add_variable("trained_model_" + dataset_type + "_loss", loss)150            self.Monitor.add_variable("trained_model_" + dataset_type + "_accuracy", acc)151        self.Monitor.save()152    def get_init_variables(self):153        return self.vars154class MultiImageModel(Model):155    def __init__(self, DataProvider, BodyBuilder, HeadBuilder, MultiImagePlaceholderBuilder, Monitor, scopes,156                 trainable_scopes, shapes, is_training, learning_rate=1e-3, lr_scheduler=None, train_mode=None):157        super().__init__(DataProvider, BodyBuilder, HeadBuilder, MultiImagePlaceholderBuilder, Monitor, scopes,158                         trainable_scopes,is_training, learning_rate, lr_scheduler)159        self.shapes = shapes160        self.train_mode=train_mode161    def set_up(self):162        self.__set_up_placeholders()163        self.__set_up_model()164        self.__set_up_training()165        self.initialized = True166    def __set_up_placeholders(self):167        self.x, self.y, self.priorities, self.weights = self.placeholder_builder.set_up_placeholders()168    def __set_up_model(self):169        model_loss, pred_logits, processed, pred, reg_loss, loss = self.__get_model_lists()170        for i, x in enumerate(self.x):171            processed[i] =self.BodyBuilder.get_body(x, self.scopes["body"], self.priorities[i],172                                                       self.weights[i])173            model_loss[i], pred_logits[i] = self.HeadBuilder.get_head(processed[i], self.y[i], self.scopes["head"])174            pred[i] = tf.nn.softmax(pred_logits[i])175            reg_loss[i] = tf.losses.get_regularization_loss()176            loss[i] = model_loss[i] + reg_loss[i]177        self.model_loss = model_loss178        self.pred_logits = pred_logits179        self.processed = processed180        self.pred = pred181        self.reg_loss = reg_loss182        self.losses = loss183        self.sum_loss = tf.reduce_sum(self.losses)184    def __get_model_lists(self):185        processed = [None] * len(self.shapes)186        model_loss = [None] * len(self.shapes)187        pred_logits = [None] * len(self.shapes)188        pred = [None] * len(self.shapes)189        reg_loss = [None] * len(self.shapes)190        loss = [None] * len(self.shapes)191        return model_loss, pred_logits, processed, pred, reg_loss, loss192    def __set_up_training(self):193        self.vars = []194        for scope in self.trainable_scopes:195            trainable_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)196            self.vars += trainable_vars197        print(len(self.vars), "TOTAL NUMBER OF PARAMETERS: ",198              np.sum([np.prod(v.get_shape().as_list()) for v in self.vars]))199        self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)200        if self.train_mode is None or len(self.train_mode)==0:201            self.train_op = self.optimizer.minimize(self.sum_loss, var_list=self.vars)202            self.loss = self.sum_loss203        else:204            self.total_loss = self.losses[self.train_mode[0]]205            for i in self.train_mode[1:]:206                self.total_loss += self.losses[i]207            self.train_op = self.optimizer.minimize(self.total_loss, var_list=self.vars)208            self.loss = self.total_loss209    def add_to_monitor_loss(self, loss, train_acc, test_loss, test_acc):210        self.Monitor.monitor_all(["train_loss", "train_acc", "valid_loss", "valid_acc"],211                                 [loss, train_acc, test_loss, test_acc])212    def add_to_monitor_loss_i(self, loss, train_acc, test_loss, test_acc, i):213        self.Monitor.monitor_all(["train_loss_{}".format(i), "train_acc_{}".format(i),214                                  "valid_loss_{}".format(i), "valid_acc_{}".format(i)],215                                 [loss, train_acc, test_loss, test_acc])216    def add_to_monitor(self, loss, pred, test_acc, test_loss, test_pred, train_acc):217        self.add_to_monitor_loss(loss, train_acc, test_loss, test_acc)218        self.Monitor.monitor_all(["train_pred", "test_pred"],219                                 [np.argmax(pred, axis=1), np.argmax(test_pred, axis=1)])220    def add_to_monitor_i(self, loss, pred, test_acc, test_loss, test_pred, train_acc, i):221        self.add_to_monitor_loss_i(loss, train_acc, test_loss, test_acc, i)222        self.Monitor.monitor_all(["train_pred_{}".format(i), "test_pred_{}".format(i)],223                                 [np.argmax(pred, axis=1), np.argmax(test_pred, axis=1)])224    def prepare_feed_dict(self, train_x, train_y, weights=None, priorities=None, is_training=True,  epoch=None):225        feed_dict = {self.is_training: is_training}226        if train_y is not None:227            for i, (xx, yy) in enumerate(zip(self.x,self.y)):228                feed_dict[xx]=train_x[i]229                feed_dict[yy]=train_y[i]230                if weights is not None and priorities is not None:231                    if(self.weights[i] is not None and self.priorities[i] is not None):232                        feed_dict[self.weights[i]]=weights[i]233                        feed_dict[self.priorities[i]]=priorities[i]234        else:235            for i, (xx, yy) in enumerate(zip(self.x, self.y)):236                feed_dict[xx]=train_x[i]237        if is_training and epoch is not None:238            lr = self.lr_scheduler(epoch) if self.lr_scheduler is not None else self.init_learning_rate239            feed_dict[self.learning_rate] = lr240        return feed_dict241    def monitor_losses(self, session, train_x, train_y, test_x, test_y, weights,priorities, test_weights, test_priorities):242        for i in range(len(self.shapes)):243            loss_i = session.run(self.loss[i], feed_dict=self.prepare_feed_dict(train_x, train_y, weights, priorities, False))244    def train(self, session, epochs, batch_size, args, saver, config_name, model_name):245        assert self.initialized, "model must be set up before training"246        self.Monitor.save_args(args, config_name)247        self.DataProvider.load_dataset(args.shuffle, args.random_state)248        for epoch in range(epochs):249            for iter in tqdm.tqdm(range(len(self.DataProvider) // batch_size)):250                train_x, train_y, weights, priorities = self.DataProvider.get_random_batch(batch_size, "train")251                _, loss, losses, pred = session.run([self.train_op, self.loss,self.losses, self.pred],252                                                feed_dict=self.prepare_feed_dict(train_x,train_y,weights,priorities,True,epoch))253                test_x, test_y, test_weights, test_priorities = self.DataProvider.get_random_batch(batch_size, "valid")254                test_loss, test_losses, test_pred = session.run([self.loss, self.losses, self.pred],255                                                       feed_dict=self.prepare_feed_dict(test_x, test_y, test_weights,256                                                                                        test_priorities,False))257                train_acc = []258                test_acc= []259                for i in range(len(self.x)):260                    train_acc.append(accurracy(np.argmax(pred[i], axis=1), train_y[i]))261                    test_acc.append(accurracy(np.argmax(test_pred[i], axis=1), test_y[i]))262                    self.add_to_monitor_i(losses[i],pred[i],test_acc[i],test_losses[i],test_pred[i],train_acc[i],i)263                self.add_to_monitor(loss, np.concatenate(pred,axis=0), np.mean(test_acc), test_loss, np.concatenate(test_pred,axis=0), np.mean(train_acc))264            self.Monitor.save_session(session, saver, model_name)265            self.Monitor.save()266    def train_epoch(self, session, epochs, batch_size, args, saver, config_name, model_name):267        assert self.initialized, "model must be set up before training"268        self.Monitor.save_args(args, config_name)269        self.DataProvider.load_dataset(args.shuffle, args.random_state)270        for epoch in range(epochs):271            Loss, Acc = [], []272            for iter in tqdm.tqdm(range(len(self.DataProvider) // batch_size)):273                train_x, train_y, weights, priorities = self.DataProvider.get_next_batch(batch_size, iter, "train")274                _, loss, losses, pred = session.run([self.train_op, self.loss, self.losses, self.pred],275                                                    feed_dict=self.prepare_feed_dict(train_x, train_y, weights,276                                                                                     priorities, True, epoch))277                train_acc = []278                for i in range(len(self.x)):279                    train_acc.append(accurracy(np.argmax(pred, axis=1), train_y))280                    self.Monitor.monitor_all_(["train_pred_{}".format(i)], [np.argmax(pred[i], axis=1)])281                Loss.append(loss)282                Acc.append(np.mean(train_acc))283            Loss_valid, Acc_valid = [], []284            for v_iter in range(self.DataProvider.valid_len() // batch_size):285                test_x, test_y, test_weights, test_priorities = self.DataProvider.get_next_batch(batch_size, v_iter,"valid")286                test_loss, test_losses, test_pred = session.run([self.loss, self.losses, self.pred],287                                                                feed_dict=self.prepare_feed_dict(test_x, test_y,288                                                                                                 test_weights,289                                                                                                 test_priorities, False))290                test_acc = []291                for i in range(len(self.x)):292                    test_acc.append(accurracy(np.argmax(test_pred, axis=1), test_y))293                    self.Monitor.monitor_all_(["valid_pred_{}".format(i)], [np.argmax(test_pred[i], axis=1)])294                Loss_valid.append(test_loss)295                Acc_valid.append(np.mean(test_acc))296            self.add_to_monitor_loss(np.mean(Loss), np.mean(Acc), np.mean(Loss_valid), np.mean(Acc_valid))297            self.Monitor.save_session(session, saver, model_name)298            self.Monitor.save()299    def predict(self, session, X):300        res_pred = []301        if len(X) == len(self.x):302            for i in range(len(self.x)):303               pred = session.run(self.pred[i], feed_dict=self.prepare_feed_dict(X,None,is_training=False))304               res_pred.append(np.argmax(pred, axis=1))305        return np.concatenate(res_pred,axis=0)306    def predict_dataset(self, session, batch_size, dataset_type, args):307        if not self.DataProvider.loaded:308                self.DataProvider.load_dataset(args.shuffle, args.random_state)309        length = self.DataProvider.get_dataset_length(dataset_type)310        for iter in range(length // batch_size):311            batch_x, batch_y, ww, pp = self.DataProvider.get_next_batch(batch_size, iter, dataset_type)312            loss, losses, pred = session.run([self.loss, self.losses, self.pred],313                                                            feed_dict=self.prepare_feed_dict(batch_x, batch_y,314                                                                                             ww, pp, is_training=False))315            acc=[]316            for i in range(len(self.x)):317                acc.append(accurracy(np.argmax(pred[i], axis=1), batch_y[i]))318                self.Monitor.add_variable("trained_mode_" + dataset_type +"_accuracy_{}".format(i),acc[i])319            self.Monitor.add_variable("trained_model_" + dataset_type + "_loss", loss)320            self.Monitor.add_variable("trained_model_" + dataset_type + "_accuracy", np.mean(acc))321        self.Monitor.save()322    def get_init_variables(self):...tests.py
Source:tests.py  
1"""Module containing the unit tests for all questions."""2import unittest3import random4import string5from top_k import top_k_select, top_k_heap6from basic_priority_queue import BasicPriorityQueue7from change_priority_queue import ChangePriorityQueue8from dheap_priority_queue import DheapPriorityQueue9from fast_priority_queue import FastPriorityQueue10class TestTopK(unittest.TestCase):11    """Tests for Task 1 on finding the top k items."""12    def test_top_k_select(self):13        """Tests the top_k_select function on a list of 1000 integers."""14        test_data = list(range(1000))15        random.shuffle(test_data)16        top_40, _ = top_k_select(test_data, 40)17        self.assertEqual(top_40, list(range(999, 959, -1)))18    def test_top_k_select_comparisons(self):19        """Tests the number of comparisons the top_k_select function makes 20        on a list of 1000 integers.21        """22        test_data = list(range(1000))23        random.shuffle(test_data)24        _, comparisons = top_k_select(test_data, 40)25        self.assertLess(comparisons, 40000)26        self.assertGreater(comparisons, 30000)27    def test_top_k_heap(self):28        """Tests the top_k_heap function on a list of 1000 integers."""29        test_data = list(range(1000))30        random.shuffle(test_data)31        top_40, _ = top_k_heap(test_data, 40)32        self.assertEqual(top_40, list(range(999, 959, -1)))33    def test_top_k_heap_comparisons(self):34        """Tests the number of comparisons made by the top_k_heap function 35        on a list of 1000 integers.36        """37        test_data = list(range(1000))38        random.shuffle(test_data)39        _, comparisons = top_k_heap(test_data, 40)40        self.assertLess(comparisons, 10400)41        self.assertGreater(comparisons, 1040)42class TestFastPriorityQueue(unittest.TestCase):43    """Tests for Task 2 on fast heapify."""44    def test_heapify(self):45        """Tests that heapify works on a small test case in sorted order."""46        fpq = FastPriorityQueue(list(range(10)))47        self.assertEqual(len(fpq), 10)48        self.assertEqual(fpq.validate(), True)49    def test_heapify_random_data(self):50        """Tests that heapify works on a small test case in random order."""51        test_data = list(range(1000))52        random.shuffle(test_data)53        fpq = FastPriorityQueue(test_data)54        self.assertEqual(len(fpq), 1000)55        self.assertEqual(fpq.validate(), True)56    # Your tests for testing the number of comparisons should go here if57    # you choose to use the unit testing framework (not this is not marked58    # and it is completely up to you on how you test your code).59class TestChangePriorityQueue(unittest.TestCase):60    """Tests for Task 3 on removing from a priority queue."""61    def test_heapify(self):62        """Tests that heapify still works correctly. Note this effectively63        tests whether swap items is swapping the items correctly and that64        the __init__ function has not been modified.65        """66        test_data = [str(digit) for digit in range(1000)]67        test_priorities = list(range(1000))68        random.shuffle(test_priorities)69        cpq = ChangePriorityQueue(test_data, test_priorities)70        self.assertEqual(len(cpq), 1000)71        self.assertEqual(cpq.validate(), True)72    def test_insert_with_priority(self):73        """Tests the insert_with_priority method on a small example."""74        cpq = ChangePriorityQueue()75        cpq.insert_with_priority('a', 1)76        cpq.insert_with_priority('b', 3)77        cpq.insert_with_priority('c', 8)78        cpq.insert_with_priority('d', 0)79        cpq.insert_with_priority('e', 4)80        self.assertEqual(len(cpq), 5)81        self.assertEqual(cpq.validate(), True)82    def test_peek_max(self):83        """Tests the peek_max method on a small example."""84        cpq = ChangePriorityQueue()85        cpq.insert_with_priority('a', 1)86        cpq.insert_with_priority('b', 3)87        cpq.insert_with_priority('c', 8)88        cpq.insert_with_priority('d', 0)89        cpq.insert_with_priority('e', 4)90        self.assertEqual(cpq.peek_max(), 'c')91    def test_pop_max(self):92        """Tests the pop_max method on a small 7 item example."""93        test_priorities = [3, 67, 65, 8, 412, 1, 22]94        test_data = list(string.ascii_lowercase)[:len(test_priorities)]95        cpq = ChangePriorityQueue(test_data, test_priorities)96        self.assertEqual(len(cpq), 7)97        self.assertEqual(cpq._item_indices['g'], 6)98        self.assertEqual(cpq.pop_max(), 'e')99        self.assertEqual(len(cpq), 6)100        self.assertEqual(cpq._item_indices['g'], 1)101        self.assertEqual(cpq.validate(), True)102    def test_remove_item(self):103        """Tests the remove_item method on a small 7 item example."""104        test_priorities = [3, 67, 65, 8, 412, 1, 22]105        test_data = list(string.ascii_lowercase)[:len(test_priorities)]106        cpq = ChangePriorityQueue(test_data, test_priorities)107        self.assertEqual(len(cpq), 7)108        self.assertEqual(cpq.pop_max(), 'e')109        self.assertEqual(len(cpq), 6)110        self.assertEqual(cpq.remove_item('g'), 'g')111        self.assertEqual(cpq.remove_item('g'), None) # pq no longer contains 'g'112        self.assertEqual(cpq.remove_item(3), None)113        self.assertEqual(cpq.validate(), True)114        self.assertEqual(len(cpq), 5)115        self.assertEqual(cpq.pop_max(), 'b')116        self.assertEqual(cpq.pop_max(), 'c')117        self.assertEqual(cpq.remove_item('d'), 'd')118        self.assertEqual(cpq.remove_item('d'), None)119        self.assertEqual(len(cpq._item_indices), 2)120        self.assertEqual(cpq._item_indices['a'], 0)121        self.assertEqual(cpq._item_indices['f'], 1)122        self.assertEqual(cpq.validate(), True)123class TestDheapPriorityQueue(unittest.TestCase):124    """Tests for Task 4 on d-heaps."""125    def test_parent_index(self):126        """Tests whether the correct parent index is found with d=5."""127        dpq = DheapPriorityQueue(list(range(100)), 5)128        self.assertEqual(dpq._parent_index(6), 1)129        self.assertEqual(dpq._parent_index(10), 1)130        self.assertEqual(dpq._parent_index(30), 5)131        self.assertEqual(dpq._parent_index(0), -1)132        self.assertEqual(dpq._parent_index(100), -1) # 100 is not in the d-heap.133    def test_children_indices(self):134        """Tests whether the correct children indices are found with d=7."""135        dpq = DheapPriorityQueue(list(range(100)), 7)136        self.assertEqual(sorted(dpq._children_indices(0)), list(range(1, 8)))137        self.assertEqual(sorted(dpq._children_indices(10)), list(range(71, 78)))138    def test_heapify(self):139        """Tests whether heapify still works correctly in a large case with d=7."""140        dpq = DheapPriorityQueue(list(range(10000)), 7)141        self.assertEqual(len(dpq), 10000)142        self.assertEqual(dpq.validate(), True)143    def test_insert(self):144        """Tests whether heapify still works correctly in a large case with d=7."""145        dpq = DheapPriorityQueue(branch_factor=6)146        dpq.insert(5)147        dpq.insert(4)148        dpq.insert(6)149        dpq.insert(3)150        dpq.insert(2)151        dpq.insert(7)152        self.assertEqual(len(dpq), 6)153        self.assertEqual(dpq.validate(), True)154    def test_pop_max_small(self):155        """Tests whether pop_max still works correctly in a small case with d=3."""156        dpq = DheapPriorityQueue([3, 67, 65, 8, 412, 1, 22], 3)157        self.assertEqual(len(dpq), 7)158        self.assertEqual(dpq.pop_max(), 412)159        self.assertEqual(len(dpq), 6)160        self.assertEqual(dpq.validate(), True)161    def test_pop_max_big(self):162        """Tests whether pop_max still works correctly in a big case with d=13."""163        dpq = DheapPriorityQueue(list(range(10000)), 13)164        for item in range(9999, 8000, -1):165            self.assertEqual(dpq.pop_max(), item)166        self.assertEqual(len(dpq), 8001)167        self.assertEqual(dpq.validate(), True)168def all_tests_suite():169    """Returns a unit_test suite containing all desired tests."""170    suite = unittest.TestSuite()171    suite.addTest(unittest.makeSuite(TestTopK))172    # uncomment the next lines when ready to rumble with those tests173    suite.addTest(unittest.makeSuite(TestFastPriorityQueue))174    suite.addTest(unittest.makeSuite(TestChangePriorityQueue))175    suite.addTest(unittest.makeSuite(TestDheapPriorityQueue))176    return suite177def main():178    """Runs all tests returned by all_tests_suite()."""179    test_runner = unittest.TextTestRunner(verbosity=0)180    test_runner.run(all_tests_suite())181if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
