Best Python code snippet using lisa_python
SaveFile.py
Source:SaveFile.py  
...198                    fixer = fixers[0]199                await self.apply_storage_fixer_async(fixer.NAME)200                await self.read_async("minecraft:general")201                self.version = fixer.FIXES_TO202            await self.dump_async(None, "minecraft:general")203            await asyncio.gather(204                self.read_async("minecraft:player_data"),205                self.read_async("minecraft:gamerule"),206                self.read_async("minecraft:registry_info_serializer"),207            )208        except mcpython.common.world.serializer.IDataSerializer.MissingSaveException:209            logger.println(210                "[WARN] save '{}' not found, falling back to selection menu".format(211                    self.directory212                )213            )214            await shared.world.cleanup()215            await shared.state_handler.change_state("minecraft:world_selection")216            return217        except (SystemExit, KeyboardInterrupt, OSError):218            raise219        except:  # lgtm [py/catch-base-exception]220            await shared.world.cleanup()221            await shared.state_handler.change_state("minecraft:start_menu")222            logger.print_exception(223                "exception during loading world. falling back to start menu..."224            )225            return226    async def save_world_async(self, *_, override=False):227        """228        Save all base-data into the system229        :param _: used when used by special event triggers230        :param override: flag for saving the chunks231        """232        if self.save_in_progress:233            raise IOError("can't save world. save in process")234        try:235            # Make sure that nothing else is going on...236            self.save_in_progress = True237            shared.world_generation_handler.enable_generation = False238            logger.println("saving world...")239            await self.dump_async(None, "minecraft:general")240            await asyncio.gather(241                self.dump_async(None, "minecraft:player_data"),242                self.dump_async(None, "minecraft:gamerule"),243                self.dump_async(None, "minecraft:registry_info_serializer"),244            )245            async def save_dimension(dimension):246                if not dimension.chunks:247                    return248                logger.println("saving dimension " + dimension.get_name())249                count = len(dimension.chunks)250                print(end="")251                for i, chunk in enumerate(dimension.chunks):252                    print(253                        "\r"254                        + str(round((i + 1) / count * 100))255                        + "% ("256                        + str(i + 1)257                        + "/"258                        + str(count)259                        + ")",260                        end="",261                    )262                    # todo: save all loaded dimension, not only the active one263                    if dimension.get_chunk(*chunk).loaded:264                        await self.dump_async(265                            None,266                            "minecraft:chunk",267                            dimension=dimension.id,268                            chunk=chunk,269                            override=override,270                        )271                print()272            await asyncio.gather(273                *(save_dimension(d) for d in shared.world.dimensions.values())274            )275            logger.println("save complete!")276            # And open the system again277            shared.world_generation_handler.enable_generation = True278            self.save_in_progress = False279        except (SystemExit, KeyboardInterrupt, OSError):280            raise281        except:  # lgtm [py/catch-base-exception]282            if shared.IS_CLIENT:283                logger.print_exception(284                    "Exception during saving world. Falling back to start menu"285                )286                await shared.world.cleanup()287                await shared.state_handler.change_state("minecraft:start_menu")288            else:289                logger.print_exception("Exception during saving world")290                await shared.NETWORK_MANAGER.disconnect()291                sys.exit(-1)292    async def apply_storage_fixer_async(self, name: str, *args, **kwargs):293        """294        Will apply a fixer to fix the storage version295        :param name: the name of the fixer to use296        :param args: the args to send297        :param kwargs: the kwargs to use298        :raises DataFixerNotFoundException: if the name is invalid299        """300        if name not in self.storage_fixer_registry.entries:301            raise DataFixerNotFoundException(name)302        fixer: mcpython.common.world.datafixers.IDataFixer.IStorageVersionFixer = (303            self.storage_fixer_registry.entries[name]304        )305        try:306            await fixer.apply(self, *args, **kwargs)307            await asyncio.gather(308                *(309                    self.apply_group_fixer_async(*args, **kwargs)310                    for name, args, kwargs in fixer.GROUP_FIXER_NAMES311                )312            )313        except (SystemExit, KeyboardInterrupt, OSError):314            raise315        except:  # lgtm [py/catch-base-exception]316            logger.print_exception(317                "during data-fixing storage version '{}'".format(name)318            )319            await shared.state_handler.change_state("minecraft:start_menu")320    async def apply_group_fixer_async(self, name: str, *args, **kwargs):321        """322        Will apply a group fixer to the system323        :param name: the name of the group fixer to use324        :param args: the args to use325        :param kwargs: the kwargs to use326        :raises DataFixerNotFoundException: if the name is invalid327        """328        if name not in self.group_fixer_registry.entries:329            raise DataFixerNotFoundException(name)330        fixer: mcpython.common.world.datafixers.IDataFixer.IGroupFixer = (331            self.group_fixer_registry.entries[name]332        )333        try:334            await fixer.apply(self, *args, **kwargs)335            await asyncio.gather(336                *(337                    self.apply_group_fixer_async(*args, **kwargs)338                    for name, args, kwargs in fixer.GROUP_FIXER_NAMES339                )340            )341        except (SystemExit, KeyboardInterrupt, OSError):342            raise343        except:  # lgtm [py/catch-base-exception]344            logger.print_exception(345                "During data-fixing group fixer '{}' (FATAL)".format(name)346            )347            await shared.world.cleanup()348            await shared.state_handler.change_state("minecraft:start_menu")349    async def apply_part_fixer_async(self, name: str, *args, **kwargs):350        """351        Will apply a part fixer to the system352        :param name: the name to use353        :param args: the args to send354        :param kwargs: the kwargs355        :raises DataFixerNotFoundException: if the name is invalid356        """357        if name not in self.part_fixer_registry.entries:358            raise DataFixerNotFoundException(name)359        fixer: mcpython.common.world.datafixers.IDataFixer.IPartFixer = (360            self.part_fixer_registry.entries[name]361        )362        try:363            await fixer.apply(self, *args, **kwargs)364        except:  # lgtm [py/catch-base-exception]365            logger.print_exception("During data-fixing part '{}' (fatal)".format(name))366            await shared.world.cleanup()367            await shared.state_handler.change_state("minecraft:start_menu")368    async def apply_mod_fixer_async(369        self, modname: str, source_version: tuple, *args, **kwargs370    ):371        """372        Applies a mod fixer(list) to the system373        :param modname: the mod name374        :param source_version: where to start from375        :param args: args to call with376        :param kwargs: kwargs to call with377        :raises DataFixerNotFoundException: if the name is invalid378        """379        if modname not in self.mod_fixers or modname not in shared.mod_loader.mods:380            raise DataFixerNotFoundException(modname)381        instance = shared.mod_loader.mods[modname]382        fixers = self.mod_fixers[modname]383        while instance.version != source_version:384            possible_fixers = set()385            for fixer in fixers:386                if source_version is None or (387                    len(fixer.FIXES_FROM) == len(source_version)388                    and source_version <= fixer.FIXES_FROM389                ):390                    possible_fixers.add(fixer)391            if len(possible_fixers) == 0:392                return393            if source_version is not None or len(possible_fixers) == 1:394                fixer: mcpython.common.world.datafixers.IDataFixer.IModVersionFixer = (395                    fixers[0]396                )397            else:398                fixer: mcpython.common.world.datafixers.IDataFixer.IModVersionFixer = (399                    min(400                        possible_fixers,401                        key=lambda v: self._get_distance(v, source_version),402                    )403                )404            try:405                await fixer.apply(self, *args, **kwargs)406                await asyncio.gather(407                    [408                        self.apply_group_fixer_async(name, *args, **kwargs)409                        for (name, args, kwargs) in fixer.GROUP_FIXER_NAMES410                    ]411                )412                await asyncio.gather(413                    [414                        self.apply_part_fixer_async(name, *args, **kwargs)415                        for (name, args, kwargs) in fixer.PART_FIXER_NAMES416                    ]417                )418            except (SystemExit, KeyboardInterrupt, OSError):419                raise420            except:  # lgtm [py/catch-base-exception]421                logger.print_exception(422                    "during data-fixing mod {} from {} to {} (fatal)".format(423                        modname, fixer.FIXES_FROM, fixer.FIXES_TO424                    )425                )426                await shared.world.cleanup()427                await shared.state_handler.change_state("minecraft:start_menu")428                return429            source_version = fixer.FIXES_TO430    @classmethod431    def _get_distance(cls, v, t):432        s = 0433        for i in range(len(v)):434            f = len(v) - i - 1435            s += 100**f * abs(v[i] - t[i])436        return s437    @classmethod438    def get_serializer_for(cls, part):439        for (440            serializer441        ) in (442            mcpython.common.world.serializer.IDataSerializer.data_serializer_registry.entries.values()443        ):444            if serializer.PART == part:445                return serializer446        raise ValueError("can't find serializer named '{}'".format(part))447    async def read_async(self, part, **kwargs):448        """449        Reads a part of the save-file450        :param part: the part to load451        :param kwargs: kwargs given to the loader452        :return: whatever the loader returns453        """454        try:455            return await self.get_serializer_for(part).load(self, **kwargs)456        except (SystemExit, KeyboardInterrupt, OSError):457            raise458        except mcpython.common.world.serializer.IDataSerializer.InvalidSaveException:459            logger.print_exception(460                "during reading part '{}' from save files under '{}' with arguments {}".format(461                    part, self.directory, kwargs462                )463            )464    async def dump_async(self, data, part, **kwargs):465        """466        Similar to read(...), but the other way round.467        :param data: the data to store, optional, may be None468        :param part: the part to save469        :param kwargs: the kwargs to give the saver470        """471        try:472            await self.get_serializer_for(part).save(data, self, **kwargs)473        except (SystemExit, KeyboardInterrupt, OSError):474            raise475        except:476            logger.print_exception("during dumping {} to '{}'".format(data, part))477    # Helper functions for fixers, loaders and savers478    # todo: add nbt serializer...userscripts.py
Source:userscripts.py  
...352    log.misc.debug("Userscript to run: {}".format(cmd_path))353    runner.finished.connect(commandrunner.deleteLater)354    runner.finished.connect(runner.deleteLater)355    runner.prepare_run(cmd_path, *args, env=env, verbose=verbose)356    tab.dump_async(runner.store_html)...collector.py
Source:collector.py  
...26DUMP_SIZE   = 3000 # 5ë¶27def print_and_say(s):28    print(s)29    say(s)30def dump_async(key_events, features):31    def dump():32        with open(f'{OUTPUT}/{str(int(time.time()))}', 'wb') as f:33            pickle.dump([key_events, features], f)34    threading.Thread(target=dump).start()35def collect() :36    running = True37    target  = TARGETS[0]38    key_events = []39    def record(event):40        nonlocal target41        nonlocal running42        if event.name == '4' and event.event_type == 'down':43            print_and_say("ì¬ë¥ ë
¹íë¡ ë³ê²½.")44            target = TARGETS[0]45        elif event.name == '5' and event.event_type == 'down' :46            print_and_say("ìì§ ë
¹íë¡ ë³ê²½.")47            target = TARGETS[1]48        elif event.name == '6' and event.event_type == 'down' :49            print_and_say("ë
¹í ì¢
ë£.")50            running = False51        else :52            key_events.append(event)53    keyboard.hook(record)54    features = []55    start = time.time()56    cnt = 057    while running :58        cnt += 159        recorded_at = time.time()60        image = ImageGrab.grab()61        62        down_img = image.resize((image.width//DOWNSCALE, image.height//DOWNSCALE))63        down_img = np.array(down_img)64        up = keyboard.is_pressed('up')65        down = keyboard.is_pressed('down')66        left = keyboard.is_pressed('left')67        right = keyboard.is_pressed('right')68        features.append(69            {70                'image': down_img,71                'recorded_at': recorded_at,72                'up': up,73                'down': down,74                'left': left,75                'right': right,76                'target': target77            }78        )79        # if len(features) >= DUMP_SIZE :80        #     dumping_features        = features[:DUMP_SIZE]81        #     features                = features[DUMP_SIZE:]82        #     dumping_key_events_len  = len(key_events)83        #     dumping_key_events      = key_events[:dumping_key_events_len]84        #     key_events              = key_events[dumping_key_events_len:]85        #     print_and_say("ë°ì´í°ë¥¼ ì ì¥í©ëë¤.")86        #     dump_async(dumping_key_events, dumping_features)87        to_wait = cnt*PERIOD - (time.time()-start)88        if to_wait > 0:89            time.sleep(to_wait)90        else :91            print_and_say("ë
¹íê° ë°ë¦¬ê³  ììµëë¤.")92    keyboard.unhook_all()93    return key_events, features94def main() :95    print_and_say("ë°ì´í° ìì§ íë¡ê·¸ë¨ì ê°ëí©ëë¤. 7 í¤ë¥¼ ëë¬ ìì§ì ììí´ì£¼ììì¤.")96    os.makedirs(OUTPUT, exist_ok=True)97    while True :98        keyboard.wait('7')99        print_and_say("ì¬ë¥ ë
¹í ìì.")100        key_events, features = collect()101        dump_async(key_events, features)102        print_and_say("ë°ì´í°ë¥¼ ì ì¥íê³  ìì§ì ì¢
ë£íìµëë¤.")103if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
