Best Python code snippet using avocado_python
test_agent_driver_base.py
Source:test_agent_driver_base.py  
...101        self.mock_get_driver = mock.patch.object(self.plugin_instance,102                                                 '_get_driver')103        self.mock_get_driver.return_value = (104            agent_driver_base.AgentDriverBase(self.plugin_instance))105    def _update_status(self, model, status, id):106        ctx = context.get_admin_context()107        self.plugin_instance.db.update_status(108            ctx,109            model,110            id,111            provisioning_status=status112        )113    def test_create_loadbalancer(self):114        with self.loadbalancer(no_delete=True) as loadbalancer:115            calls = self.mock_api.create_loadbalancer.call_args_list116            self.assertEqual(1, len(calls))117            _, called_lb, _, device_driver = calls[0][0]118            self.assertEqual(loadbalancer['loadbalancer']['id'], called_lb.id)119            self.assertEqual('dummy', device_driver)120            self.assertEqual(constants.PENDING_CREATE,121                             called_lb.provisioning_status)122    def test_update_loadbalancer(self):123        with self.loadbalancer(no_delete=True) as loadbalancer:124            lb_id = loadbalancer['loadbalancer']['id']125            old_lb_name = loadbalancer['loadbalancer']['name']126            ctx = context.get_admin_context()127            self.plugin_instance.db.update_loadbalancer_provisioning_status(128                ctx,129                loadbalancer['loadbalancer']['id'])130            new_lb_name = 'new_lb_name'131            loadbalancer['loadbalancer']['name'] = new_lb_name132            self._update_loadbalancer_api(133                lb_id, {'loadbalancer': {'name': new_lb_name}})134            calls = self.mock_api.update_loadbalancer.call_args_list135            self.assertEqual(1, len(calls))136            _, called_old_lb, called_new_lb, called_host = calls[0][0]137            self.assertEqual(lb_id, called_old_lb.id)138            self.assertEqual(lb_id, called_new_lb.id)139            self.assertEqual(old_lb_name, called_old_lb.name)140            self.assertEqual(new_lb_name, called_new_lb.name)141            self.assertEqual('host', called_host)142            self.assertEqual(constants.PENDING_UPDATE,143                             called_new_lb.provisioning_status)144    def test_delete_loadbalancer(self):145        with self.loadbalancer(no_delete=True) as loadbalancer:146            lb_id = loadbalancer['loadbalancer']['id']147            ctx = context.get_admin_context()148            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)149            self.plugin_instance.delete_loadbalancer(ctx, lb_id)150            calls = self.mock_api.delete_loadbalancer.call_args_list151            self.assertEqual(1, len(calls))152            _, called_lb, called_host = calls[0][0]153            self.assertEqual(lb_id, called_lb.id)154            self.assertEqual('host', called_host)155            self.assertEqual(constants.PENDING_DELETE,156                             called_lb.provisioning_status)157            self.assertRaises(loadbalancerv2.EntityNotFound,158                              self.plugin_instance.db.get_loadbalancer,159                              ctx, lb_id)160    def test_create_listener(self):161        with self.loadbalancer(no_delete=True) as loadbalancer:162            lb_id = loadbalancer['loadbalancer']['id']163            self._update_status(models.LoadBalancer, constants.ACTIVE,164                                loadbalancer['loadbalancer']['id'])165            with self.listener(loadbalancer_id=lb_id,166                               no_delete=True) as listener:167                listener_id = listener['listener']['id']168                calls = self.mock_api.create_listener.call_args_list169                _, called_listener, called_host = calls[0][0]170                self.assertEqual(listener_id, called_listener.id)171                self.assertEqual('host', called_host)172                self.assertEqual(constants.PENDING_CREATE,173                                 called_listener.provisioning_status)174                ctx = context.get_admin_context()175                lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)176                self.assertEqual(constants.PENDING_UPDATE,177                                 lb.provisioning_status)178    def test_update_listener(self):179        with self.loadbalancer(no_delete=True) as loadbalancer:180            lb_id = loadbalancer['loadbalancer']['id']181            self._update_status(models.LoadBalancer, constants.ACTIVE,182                                loadbalancer['loadbalancer']['id'])183            with self.listener(loadbalancer_id=lb_id,184                               no_delete=True) as listener:185                listener_id = listener['listener']['id']186                old_name = listener['listener']['name']187                ctx = context.get_admin_context()188                self._update_status(models.LoadBalancer, constants.ACTIVE,189                                    lb_id)190                self.plugin_instance.db.get_listener(ctx, listener_id)191                new_name = 'new_listener_name'192                listener['listener']['name'] = new_name193                self.plugin_instance.update_listener(194                    ctx, listener['listener']['id'], listener)195                self.plugin_instance.db.get_listener(196                    ctx, listener['listener']['id'])197                calls = self.mock_api.update_listener.call_args_list198                (_, old_called_listener,199                 new_called_listener, called_host) = calls[0][0]200                self.assertEqual(listener_id, new_called_listener.id)201                self.assertEqual(listener_id, old_called_listener.id)202                self.assertEqual(old_name, old_called_listener.name)203                self.assertEqual(new_name, new_called_listener.name)204                self.assertEqual(constants.PENDING_UPDATE,205                                 new_called_listener.provisioning_status)206                lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)207                self.assertEqual(constants.PENDING_UPDATE,208                                 lb.provisioning_status)209                self.assertEqual('host', called_host)210    def test_delete_listener(self):211        with self.loadbalancer(no_delete=True) as loadbalancer:212            lb_id = loadbalancer['loadbalancer']['id']213            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)214            with self.listener(loadbalancer_id=lb_id,215                               no_delete=True) as listener:216                listener_id = listener['listener']['id']217                self._update_status(models.LoadBalancer, constants.ACTIVE,218                                    lb_id)219                ctx = context.get_admin_context()220                self.plugin_instance.delete_listener(221                    ctx, listener['listener']['id'])222                calls = self.mock_api.delete_listener.call_args_list223                _, called_listener, called_host = calls[0][0]224                self.assertEqual(listener_id, called_listener.id)225                self.assertEqual('host', called_host)226                self.assertEqual(constants.PENDING_DELETE,227                                 called_listener.provisioning_status)228                ctx = context.get_admin_context()229                lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)230                self.assertEqual(constants.ACTIVE,231                                 lb.provisioning_status)232                self.assertRaises(233                    loadbalancerv2.EntityNotFound,234                    self.plugin_instance.db.get_listener, ctx, listener_id)235    def test_create_pool(self):236        with self.loadbalancer(no_delete=True) as loadbalancer:237            lb_id = loadbalancer['loadbalancer']['id']238            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)239            with self.listener(loadbalancer_id=lb_id,240                               no_delete=True) as listener:241                listener_id = listener['listener']['id']242                self._update_status(models.LoadBalancer, constants.ACTIVE,243                                    lb_id)244                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,245                               no_delete=True) as pool:246                    pool_id = pool['pool']['id']247                    calls = self.mock_api.create_pool.call_args_list248                    _, called_pool, called_host = calls[0][0]249                    self.assertEqual(pool_id, called_pool.id)250                    self.assertEqual('host', called_host)251                    self.assertEqual(constants.PENDING_CREATE,252                                     called_pool.provisioning_status)253                    ctx = context.get_admin_context()254                    lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)255                    self.assertEqual(constants.PENDING_UPDATE,256                                     lb.provisioning_status)257    def test_update_pool(self):258        ctx = context.get_admin_context()259        with self.loadbalancer(no_delete=True) as loadbalancer:260            lb_id = loadbalancer['loadbalancer']['id']261            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)262            with self.listener(loadbalancer_id=lb_id,263                               no_delete=True) as listener:264                listener_id = listener['listener']['id']265                self._update_status(models.LoadBalancer, constants.ACTIVE,266                                    lb_id)267                with self.pool(loadbalancer_id=lb_id, listener_id=listener_id,268                               no_delete=True) as pool:269                    pool_id = pool['pool']['id']270                    old_name = pool['pool']['name']271                    self._update_status(models.LoadBalancer, constants.ACTIVE,272                                        lb_id)273                    new_name = 'new_name'274                    pool['pool']['name'] = new_name275                    self.plugin_instance.update_pool(ctx, pool_id, pool)276                    calls = self.mock_api.update_pool.call_args_list277                    (_, old_called_pool,278                     new_called_pool, called_host) = calls[0][0]279                    self.assertEqual(pool_id, new_called_pool.id)280                    self.assertEqual(pool_id, old_called_pool.id)281                    self.assertEqual(old_name, old_called_pool.name)282                    self.assertEqual(new_name, new_called_pool.name)283                    self.assertEqual(constants.PENDING_UPDATE,284                                     new_called_pool.provisioning_status)285                    lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)286                    self.assertEqual(constants.PENDING_UPDATE,287                                     lb.provisioning_status)288                    self.assertEqual('host', called_host)289    def test_delete_pool(self):290        with self.loadbalancer(no_delete=True) as loadbalancer:291            lb_id = loadbalancer['loadbalancer']['id']292            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)293            with self.listener(loadbalancer_id=lb_id,294                               no_delete=True) as listener:295                listener_id = listener['listener']['id']296                self._update_status(models.LoadBalancer, constants.ACTIVE,297                                    lb_id)298                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,299                               no_delete=True) as pool:300                    pool_id = pool['pool']['id']301                    self._update_status(models.LoadBalancer, constants.ACTIVE,302                                        lb_id)303                    ctx = context.get_admin_context()304                    self.plugin_instance.delete_pool(ctx, pool_id)305                    calls = self.mock_api.delete_pool.call_args_list306                    _, called_pool, called_host = calls[0][0]307                    self.assertEqual(pool_id, called_pool.id)308                    self.assertEqual('host', called_host)309                    self.assertEqual(constants.PENDING_DELETE,310                                     called_pool.provisioning_status)311                    lb = self.plugin_instance.db.get_loadbalancer(ctx, lb_id)312                    self.assertEqual(constants.ACTIVE,313                                     lb.provisioning_status)314                    self.assertRaises(315                        loadbalancerv2.EntityNotFound,316                        self.plugin_instance.db.get_pool, ctx, pool_id)317    def test_create_member(self):318        with self.loadbalancer(no_delete=True) as loadbalancer:319            lb_id = loadbalancer['loadbalancer']['id']320            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)321            with self.listener(loadbalancer_id=lb_id,322                               no_delete=True) as listener:323                listener_id = listener['listener']['id']324                self._update_status(models.LoadBalancer, constants.ACTIVE,325                                    lb_id)326                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,327                               no_delete=True) as pool:328                    pool_id = pool['pool']['id']329                    self._update_status(models.LoadBalancer, constants.ACTIVE,330                                        lb_id)331                    with self.subnet(cidr='11.0.0.0/24') as subnet:332                        with self.member(pool_id=pool_id, subnet=subnet,333                                         no_delete=True) as member:334                            member_id = member['member']['id']335                            calls = self.mock_api.create_member.call_args_list336                            _, called_member, called_host = calls[0][0]337                            self.assertEqual(member_id, called_member.id)338                            self.assertEqual('host', called_host)339                            self.assertEqual(constants.PENDING_CREATE,340                                             called_member.provisioning_status)341                            ctx = context.get_admin_context()342                            lb = self.plugin_instance.db.get_loadbalancer(343                                ctx, lb_id)344                            self.assertEqual(constants.PENDING_UPDATE,345                                             lb.provisioning_status)346    def test_update_member(self):347        with self.loadbalancer(no_delete=True) as loadbalancer:348            lb_id = loadbalancer['loadbalancer']['id']349            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)350            with self.listener(loadbalancer_id=lb_id,351                               no_delete=True) as listener:352                listener_id = listener['listener']['id']353                self._update_status(models.LoadBalancer, constants.ACTIVE,354                                    lb_id)355                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,356                               no_delete=True) as pool:357                    pool_id = pool['pool']['id']358                    self._update_status(models.LoadBalancer, constants.ACTIVE,359                                        lb_id)360                    with self.subnet(cidr='11.0.0.0/24') as subnet:361                        with self.member(pool_id=pool_id, subnet=subnet,362                                         no_delete=True) as member:363                            member_id = member['member']['id']364                            self._update_status(models.LoadBalancer,365                                                constants.ACTIVE, lb_id)366                            old_weight = member['member']['weight']367                            new_weight = 2368                            member['member']['weight'] = new_weight369                            ctx = context.get_admin_context()370                            self.plugin_instance.update_pool_member(371                                ctx, member_id, pool_id, member)372                            calls = self.mock_api.update_member.call_args_list373                            (_, old_called_member,374                             new_called_member, called_host) = calls[0][0]375                            self.assertEqual(member_id, new_called_member.id)376                            self.assertEqual(member_id, old_called_member.id)377                            self.assertEqual(old_weight,378                                             old_called_member.weight)379                            self.assertEqual(new_weight,380                                             new_called_member.weight)381                            self.assertEqual(382                                constants.PENDING_UPDATE,383                                new_called_member.provisioning_status)384                            lb = self.plugin_instance.db.get_loadbalancer(385                                ctx, lb_id)386                            self.assertEqual(constants.PENDING_UPDATE,387                                             lb.provisioning_status)388                            self.assertEqual('host', called_host)389    def test_delete_member(self):390        with self.loadbalancer(no_delete=True) as loadbalancer:391            lb_id = loadbalancer['loadbalancer']['id']392            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)393            with self.listener(loadbalancer_id=lb_id,394                               no_delete=True) as listener:395                listener_id = listener['listener']['id']396                self._update_status(models.LoadBalancer, constants.ACTIVE,397                                    lb_id)398                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,399                               no_delete=True) as pool:400                    pool_id = pool['pool']['id']401                    self._update_status(models.LoadBalancer, constants.ACTIVE,402                                        lb_id)403                    with self.subnet(cidr='11.0.0.0/24') as subnet:404                        with self.member(pool_id=pool_id, subnet=subnet,405                                         no_delete=True) as member:406                            member_id = member['member']['id']407                            self._update_status(models.LoadBalancer,408                                                constants.ACTIVE, lb_id)409                            ctx = context.get_admin_context()410                            self.plugin_instance.delete_pool_member(411                                ctx, member_id, pool_id)412                            calls = self.mock_api.delete_member.call_args_list413                            _, called_member, called_host = calls[0][0]414                            self.assertEqual(member_id, called_member.id)415                            self.assertEqual('host', called_host)416                            self.assertEqual(constants.PENDING_DELETE,417                                             called_member.provisioning_status)418                            lb = self.plugin_instance.db.get_loadbalancer(419                                ctx, lb_id)420                            self.assertEqual(constants.ACTIVE,421                                             lb.provisioning_status)422                            self.assertRaises(423                                loadbalancerv2.EntityNotFound,424                                self.plugin_instance.db.get_pool_member,425                                ctx, member_id)426    def test_create_health_monitor(self):427        with self.loadbalancer(no_delete=True) as loadbalancer:428            lb_id = loadbalancer['loadbalancer']['id']429            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)430            with self.listener(loadbalancer_id=lb_id,431                               no_delete=True) as listener:432                listener_id = listener['listener']['id']433                self._update_status(models.LoadBalancer, constants.ACTIVE,434                                    lb_id)435                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,436                               no_delete=True) as pool:437                    pool_id = pool['pool']['id']438                    self._update_status(models.LoadBalancer, constants.ACTIVE,439                                        lb_id)440                    with self.healthmonitor(pool_id=pool_id,441                                            no_delete=True) as monitor:442                        hm_id = monitor['healthmonitor']['id']443                        calls = (444                            self.mock_api.create_healthmonitor.call_args_list)445                        _, called_hm, called_host = calls[0][0]446                        self.assertEqual(hm_id, called_hm.id)447                        self.assertEqual('host', called_host)448                        self.assertEqual(constants.PENDING_CREATE,449                                         called_hm.provisioning_status)450                        ctx = context.get_admin_context()451                        lb = self.plugin_instance.db.get_loadbalancer(452                            ctx, lb_id)453                        self.assertEqual(constants.PENDING_UPDATE,454                                         lb.provisioning_status)455    def test_update_health_monitor(self):456        with self.loadbalancer(no_delete=True) as loadbalancer:457            lb_id = loadbalancer['loadbalancer']['id']458            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)459            with self.listener(loadbalancer_id=lb_id,460                               no_delete=True) as listener:461                listener_id = listener['listener']['id']462                self._update_status(models.LoadBalancer, constants.ACTIVE,463                                    lb_id)464                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,465                               no_delete=True) as pool:466                    pool_id = pool['pool']['id']467                    self._update_status(models.LoadBalancer, constants.ACTIVE,468                                        lb_id)469                    with self.healthmonitor(pool_id=pool_id,470                                            no_delete=True) as monitor:471                        hm_id = monitor['healthmonitor']['id']472                        self._update_status(models.LoadBalancer,473                                            constants.ACTIVE, lb_id)474                        old_to = monitor['healthmonitor']['timeout']475                        new_to = 2476                        monitor['healthmonitor']['timeout'] = new_to477                        ctx = context.get_admin_context()478                        self.plugin_instance.update_healthmonitor(ctx, hm_id,479                                                                  monitor)480                        calls = (481                            self.mock_api.update_healthmonitor.call_args_list)482                        (_, old_called_hm,483                         new_called_hm, called_host) = calls[0][0]484                        self.assertEqual(hm_id, new_called_hm.id)485                        self.assertEqual(hm_id, old_called_hm.id)486                        self.assertEqual(old_to,487                                         old_called_hm.timeout)488                        self.assertEqual(new_to,489                                         new_called_hm.timeout)490                        self.assertEqual(491                            constants.PENDING_UPDATE,492                            new_called_hm.provisioning_status)493                        lb = self.plugin_instance.db.get_loadbalancer(494                            ctx, lb_id)495                        self.assertEqual(constants.PENDING_UPDATE,496                                         lb.provisioning_status)497                        self.assertEqual('host', called_host)498    def test_delete_health_monitor(self):499        with self.loadbalancer(no_delete=True) as loadbalancer:500            lb_id = loadbalancer['loadbalancer']['id']501            self._update_status(models.LoadBalancer, constants.ACTIVE, lb_id)502            with self.listener(loadbalancer_id=lb_id,503                               no_delete=True) as listener:504                listener_id = listener['listener']['id']505                self._update_status(models.LoadBalancer, constants.ACTIVE,506                                    lb_id)507                with self.pool(listener_id=listener_id, loadbalancer_id=lb_id,508                               no_delete=True) as pool:509                    pool_id = pool['pool']['id']510                    self._update_status(models.LoadBalancer, constants.ACTIVE,511                                        lb_id)512                    with self.healthmonitor(pool_id=pool_id,513                                            no_delete=True) as monitor:514                        hm_id = monitor['healthmonitor']['id']515                        self._update_status(models.LoadBalancer,516                                            constants.ACTIVE, lb_id)517                        ctx = context.get_admin_context()518                        self.plugin_instance.delete_healthmonitor(ctx, hm_id)519                        calls = (520                            self.mock_api.delete_healthmonitor.call_args_list)521                        _, called_hm, called_host = calls[0][0]522                        self.assertEqual(hm_id, called_hm.id)523                        self.assertEqual('host', called_host)524                        self.assertEqual(constants.PENDING_DELETE,525                                         called_hm.provisioning_status)526                        lb = self.plugin_instance.db.get_loadbalancer(527                            ctx, lb_id)528                        self.assertEqual(constants.ACTIVE,529                                         lb.provisioning_status)...main.py
Source:main.py  
...65        self.file_manager_open()66    def _select_zip(self):67        try:68            if not os.path.isfile(self.zip_path) or not os.path.splitext(self.zip_path)[1]  in self.zipsFormats:69                self._update_status( "Wrong File is Selected ")70                return71            Logger.debug(self.zip_path)72            self.zip_name = os.path.basename(self.zip_path)[:-4]73            self._update_status(str(self.zip_name) + " is selected")74            self._update_progress_bar(20)75        except Exception as e:76            Logger.exception(e)77            self._update_status("Something went wrong ... Try again")78    def file_manager_open(self):79        if platform == "android":80            self.file_manager.show("/storage/emulated/0/")81        else:82            self.file_manager.show("/")  # output manager to the screen83        self.manager_open = True84    def select_path(self, path):85        '''It will be called when you click on the file name86        or the catalog selection button.87        :type path: str;88        :param path: path to the selected directory or file;89        '''90        self.zip_path = path91        print("printing selected zip path ... "+str(path))92        self.exit_manager()93    def exit_manager(self, *args):94        '''Called when the user reaches the root of the directory tree.'''95        self.manager_open = False96        self.file_manager.close()97        if self.zip_path == None:98            self._update_status("No file is selected")99            return100        select_zip_thread = Thread(target=self._select_zip)101        select_zip_thread.start()102    def events(self, instance, keyboard, keycode, text, modifiers):103        '''Called when buttons are pressed on the mobile device.'''104        if keyboard in (1001, 27):105            if self.manager_open:106                self.file_manager.back()107        return True108    def convert(self,*args):109        if platform == 'android':110            if  check_permission(Permission.WRITE_EXTERNAL_STORAGE) == False: # check permission takes str not a list111                request_permission(Permission.WRITE_EXTERNAL_STORAGE)112                request_permission(Permission.READ_EXTERNAL_STORAGE)# request_permisssion takes str and request_permissions takes list113                return114            # Make a Folder for Zip115            # Make ChandanVideo Folder in Internal Storage116            if self.zip_path == None or os.path.splitext(self.zip_path)[1] != '.zip':117                self.root.ids.status_label.text = "Select a zip first "118                return119            self._update_status("Please Wait ... Checking Zip")120            self.clips_path = os.path.join(self.app_path, self.zip_name)121            self.video_folder = os.path.join(self.primary_ext_storage, "Zipvy")122            self.video_location = os.path.join(self.video_folder, self.zip_name+".mp4")123            Logger.debug("Clips Path : "+str(self.clips_path)  +124                        "\n video_folder : "+str(self.video_folder)+125                        "\n video_path:" + str(self.video_location) )126            convert_thread = Thread(target=self._convert)127            convert_thread.start()128        else:129            self.root.ids.prog.value = 75130            self.root.ids.status_label.text = " I am running in "+str(platform)131            print(" I am running in "+str(platform))132    def _convert(self):133        try:134            if not os.path.exists(self.clips_path):135                os.makedirs(self.clips_path)136            if not os.path.exists(self.video_folder):137                os.makedirs(self.video_folder)138            # Extract Zip to the folder139            unzip(self.zip_path, self.clips_path)140            self._update_status("Unzipped")141            Logger.debug("Unzipped clips for file " + str(self.zip_path))142            Logger.debug("clips are available at  " + str(self.clips_path))143            # move clips to root144            move_to_root_folder(self.clips_path, self.clips_path)145            Logger.debug("Moving clips to root dir is done")146            # Delete Small clips ...147            deleted_clips = delete_small_clips(self.clips_path)148            Logger.debug("Following Clips are deleted due to low size (less than 100kb)")149            for deleted_clip in deleted_clips:150                Logger.debug(deleted_clip)151            self._update_progress_bar(50)152            self._update_status("Ready to make video")153            # merge clips and make video154            Logger.debug("Entered to merging try block")155            merge(self.clips_path, self.video_location, self._update_progress)156            Logger.debug("End of merging try block")157            # Clearing Temp dir158            delete_all_clips(self.clips_path)159            self._update_status(os.path.basename(self.video_location)[:-4]+" is now ready to watch")160            self._toast("Always use Mx/Vlc Player to watch Videos")161        except Exception:162            Logger.exception('Something happened wrong at merge')163            self._update_status("Something happened wrong at merge")164    def open_video_folder(self,*args):165        if platform == 'android':166            pass167        pass168    def open_video(self,*args):169        if platform == 'android':170            pass171            #self._open(self.video_location)172    def action_button_callback(self,instance):173        if instance.icon == 'language-python':174            self.theme_cls.primary_palette = "Green"175        if instance.icon == 'language-php':176            self.theme_cls.primary_palette = "Orange"177        if instance.icon == 'language-cpp':178            self.theme_cls.primary_palette = "Blue"179    def _update_progress(self,file_no,file_name,files):180        percentage = int(file_no*100/files)181        self._update_progress_bar(50+int(percentage/2))182        progress_text  = "Merging Clips " + str(file_no) +"/"+str(files)+" "+ str(percentage)+" %"183        self._update_status(progress_text)184        Logger.debug(progress_text+" of "+ str(file_name))185    @mainthread186    def _update_status(self,text):187        self.root.ids.status_label.text = str(text)188    @mainthread189    def _update_progress_bar(self,value):190        self.root.ids.prog.value = int(value)191    def _open(self, video_location):192        try:193            from kivy.setupconfig import USE_SDL2194            if platform == 'android':195                from jnius import cast196                from jnius import autoclass197                if USE_SDL2:198                    PythonActivity = autoclass('org.kivy.android.PythonActivity')199                else:200                    PythonActivity = autoclass('org.renpy.android.PythonActivity')...instrumented_requests.py
Source:instrumented_requests.py  
...38    fields = {'name': name, 'client': 'requests'}39    http_metrics.request_bytes.add(request_bytes, fields=fields)40    http_metrics.response_bytes.add(response_bytes, fields=fields)41    http_metrics.durations.add(duration_msec, fields=fields)42    _update_status(name, response.status_code)43  return hook44def _update_status(name, status):45  fields = {'status': status, 'name': name, 'client': 'requests'}46  http_metrics.response_status.increment(fields=fields)47def _wrap(method, name, url, *args, **kwargs):48  hooks = {'response': instrumentation_hook(name)}49  if 'hooks' in kwargs:50    hooks.update(kwargs['hooks'])51  kwargs['hooks'] = hooks52  try:53    return getattr(requests, method)(url, *args, **kwargs)54  except requests.exceptions.ReadTimeout:55    _update_status(name, http_metrics.STATUS_TIMEOUT)56    raise57  except requests.exceptions.ConnectionError:58    _update_status(name, http_metrics.STATUS_ERROR)59    raise60  except requests.exceptions.RequestException:61    _update_status(name, http_metrics.STATUS_EXCEPTION)62    raise63request = functools.partial(_wrap, 'request')64get = functools.partial(_wrap, 'get')65head = functools.partial(_wrap, 'head')66post = functools.partial(_wrap, 'post')67patch = functools.partial(_wrap, 'patch')68put = functools.partial(_wrap, 'put')69delete = functools.partial(_wrap, 'delete')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
