Best Python code snippet using avocado_python
QuickBackupM.py
Source:QuickBackupM.py  
...101                        os.path.realpath(os.path.join(dst, world)), ignore=filter_ignore)102def remove_worlds(folder):103    for world in config['world_names']:104        shutil.rmtree(os.path.realpath(os.path.join(folder, world)))105def get_slot_count():106    return len(config['slots'])107def get_slot_folder(slot):108    return os.path.join(config['backup_path'], f"slot{slot}")109def get_slot_info(slot):110    """111    :param int slot: the index of the slot112    :return: the slot info113    :rtype: dict or None114    """115    try:116        with open(os.path.join(get_slot_folder(slot), 'info.json')) as f:117            info = json.load(f)118        for key in info.keys():119            value = info[key]120    except:121        info = None122    return info123def format_time():124    return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())125def format_protection_time(time_length):126    """127    :rtype: str128    """129    if time_length < 60:130        return '{}ç§'.format(time_length)131    elif time_length < 60 * 60:132        return '{}åé'.format(round(time_length / 60, 2))133    elif time_length < 24 * 60 * 60:134        return '{}å°æ¶'.format(round(time_length / 60 / 60, 2))135    else:136        return '{}天'.format(round(time_length / 60 / 60 / 24, 2))137def format_slot_info(info_dict=None, slot_number=None):138    if type(info_dict) is dict:139        info = info_dict140    elif type(slot_number) is not None:141        info = get_slot_info(slot_number)142    else:143        return None144    if info is None:145        return None146    msg = 'æ¥æ: {}; 注é: {}'.format(info['time'], info.get('comment', '§7空§r'))147    return msg148def touch_backup_folder():149    def mkdir(path):150        if not os.path.exists(path):151            os.mkdir(path)152    mkdir(config['backup_path'])153    for i in range(get_slot_count()):154        mkdir(get_slot_folder(i + 1))155def slot_number_formatter(slot):156    flag_fail = False157    if type(slot) is not int:158        try:159            slot = int(slot)160        except ValueError:161            flag_fail = True162    if flag_fail or not 1 <= slot <= get_slot_count():163        return None164    return slot165def slot_check(source, slot):166    slot = slot_number_formatter(slot)167    if slot is None:168        print_message(source, 'æ§½ä½è¾å
¥é误ï¼åºè¾å
¥ä¸ä¸ªä½äº[{}, {}]çæ°å'.format(169            1, get_slot_count()))170        return None171    slot_info = get_slot_info(slot)172    if slot_info is None:173        print_message(source, 'æ§½ä½è¾å
¥éè¯¯ï¼æ§½ä½Â§6{}§r为空'.format(slot))174        return None175    return slot, slot_info176def delete_backup(source, slot):177    global creating_backup_lock, restoring_backup_lock178    if creating_backup_lock.locked() or restoring_backup_lock.locked():179        return180    if slot_check(source, slot) is None:181        return182    try:183        shutil.rmtree(get_slot_folder(slot))184    except Exception as e:185        print_message(source, '§4å é¤æ§½ä½Â§6{}§r失败§rï¼é误代ç ï¼{}'.format(186            slot, e), tell=False)187    else:188        print_message(source, '§aå é¤æ§½ä½Â§6{}§rå®æÂ§r'.format(slot), tell=False)189def clean_up_slot_1():190    """191    try to cleanup slot 1 for backup192    :rtype: bool193    """194    slots = []195    empty_slot_idx = None196    target_slot_idx = None197    max_available_idx = None198    for i in range(get_slot_count()):199        slot_idx = i + 1200        slot = get_slot_info(slot_idx)201        slots.append(slot)202        if slot is None:203            if empty_slot_idx is None:204                empty_slot_idx = slot_idx205        else:206            time_stamp = slot.get('time_stamp', None)207            if time_stamp is not None:208                slot_config_data = config['slots'][slot_idx - 1]  # type: dict209                if time.time() - time_stamp > slot_config_data['delete_protection']:210                    max_available_idx = slot_idx211            else:212                # old format, treat it as available213                max_available_idx = slot_idx214    if empty_slot_idx is not None:215        target_slot_idx = empty_slot_idx216    else:217        target_slot_idx = max_available_idx218    if target_slot_idx is not None:219        folder = get_slot_folder(target_slot_idx)220        if os.path.isdir(folder):221            shutil.rmtree(folder)222        for i in reversed(range(1, target_slot_idx)):  # n-1, n-2, ..., 1223            os.rename(get_slot_folder(i), get_slot_folder(i + 1))224        return True225    else:226        return False227@new_thread('QBM')228def create_backup(source: CommandSource, comment):229    global restoring_backup_lock, creating_backup_lock230    if restoring_backup_lock.locked():231        print_message(source, 'æ£å¨Â§cåæ¡£Â§rä¸ï¼è¯·ä¸è¦å°è¯å¤ä»½', tell=False)232        return233    acquired = creating_backup_lock.acquire(blocking=False)234    if not acquired:235        print_message(source, 'æ£å¨Â§aå¤ä»½Â§rä¸ï¼è¯·ä¸è¦éå¤è¾å
¥', tell=False)236        return237    try:238        print_message(source, '§aå¤ä»½Â§rä¸...请ç¨ç', tell=False)239        start_time = time.time()240        touch_backup_folder()241        # start backup242        global game_saved, plugin_unloaded243        game_saved = False244        if config['turn_off_auto_save']:245            source.get_server().execute('save-off')246        source.get_server().execute('save-all flush')247        while True:248            time.sleep(0.01)249            if game_saved:250                break251            if plugin_unloaded:252                print_message(source, 'æä»¶éè½½ï¼Â§aå¤ä»½Â§r䏿ï¼', tell=False)253                return254        if not clean_up_slot_1():255            print_message(source, 'æªæ¾å°å¯ç¨æ§½ä½ï¼Â§aå¤ä»½Â§r䏿ï¼', tell=False)256            return257        slot_path = get_slot_folder(1)258        # copy worlds to backup slot259        copy_worlds(config['server_path'], slot_path)260        # create info.json261        slot_info = {262            'time': format_time(),263            'time_stamp': time.time()264        }265        if comment is not None:266            slot_info['comment'] = comment267        with open(os.path.join(slot_path, 'info.json'), 'w') as f:268            json.dump(slot_info, f, indent=4)269        # done270        end_time = time.time()271        print_message(source, '§aå¤ä»½Â§r宿ï¼èæ¶Â§6{}§rç§'.format(272            round(end_time - start_time, 1)), tell=False)273        print_message(source, format_slot_info(274            info_dict=slot_info), tell=False)275        source.get_server().dispatch_event(LiteralEvent(276            'qbm.backup_done'), (source, ))  # just for showcase277    except Exception as e:278        print_message(source, '§aå¤ä»½Â§r失败ï¼é误代ç {}'.format(e), tell=False)279    finally:280        creating_backup_lock.release()281        if config['turn_off_auto_save']:282            source.get_server().execute('save-on')283@new_thread('QBM')284def restore_backup(source: CommandSource, slot):285    ret = slot_check(source, slot)286    if ret is None:287        return288    else:289        slot, slot_info = ret290    global slot_selected, abort_restore291    slot_selected = slot292    abort_restore = False293    print_message(source, 'åå¤å°åæ¡£æ¢å¤è³æ§½ä½Â§6{}§rï¼ {}'.format(294        slot, format_slot_info(info_dict=slot_info)), tell=False)295    print_message(296        source,297        command_run('使ç¨Â§7{0} confirm§r 确认§cåæ¡£Â§r'.format(298            Prefix), 'ç¹å»ç¡®è®¤', '{0} confirm'.format(Prefix))299        + ', '300        + command_run('§7{0} abort§r åæ¶'.format(Prefix), 'ç¹å»åæ¶', '{0} abort'.format(Prefix)), tell=False301    )302@new_thread('QBM')303def confirm_restore(source: CommandSource):304    global restoring_backup_lock, creating_backup_lock305    if creating_backup_lock.locked():306        print_message(source, 'æ£å¨Â§aå¤ä»½Â§rä¸ï¼è¯·ä¸è¦å°è¯åæ¡£', tell=False)307        return308    acquired = restoring_backup_lock.acquire(blocking=False)309    if not acquired:310        print_message(source, 'æ£å¨åå¤Â§cåæ¡£Â§rä¸ï¼è¯·ä¸è¦éå¤è¾å
¥', tell=False)311        return312    try:313        global slot_selected314        if slot_selected is None:315            print_message(source, '没æä»ä¹éè¦ç¡®è®¤ç', tell=False)316            return317        slot = slot_selected318        slot_selected = None319        print_message(source, '10ç§åå
³éæå¡å¨Â§cåæ¡£Â§r', tell=False)320        for countdown in range(1, 10):321            print_message(source, command_run(322                'è¿æ{}ç§ï¼å°Â§cåæ¡£Â§r为槽ä½Â§6{}§rï¼{}'.format(323                    10 - countdown, slot, format_slot_info(slot_number=slot)),324                'ç¹å»ç»æ¢åæ¡£ï¼',325                '{} abort'.format(Prefix)326            ), tell=False)327            for i in range(10):328                time.sleep(0.1)329                global abort_restore330                if abort_restore:331                    print_message(source, '§cåæ¡£Â§rè¢«ä¸æï¼', tell=False)332                    return333        source.get_server().stop()334        source.get_server().logger.info('[QBM] Wait for server to stop')335        source.get_server().wait_for_start()336        source.get_server().logger.info(337            '[QBM] Backup current world to avoid idiot')338        overwrite_backup_path = os.path.join(339            config['backup_path'], config['overwrite_backup_folder'])340        if os.path.exists(overwrite_backup_path):341            shutil.rmtree(overwrite_backup_path)342        copy_worlds(config['server_path'], overwrite_backup_path)343        with open(os.path.join(overwrite_backup_path, 'info.txt'), 'w') as f:344            f.write('Overwrite time: {}\n'.format(format_time()))345            f.write('Confirmed by: {}'.format(source))346        slot_folder = get_slot_folder(slot)347        source.get_server().logger.info('[QBM] Deleting world')348        remove_worlds(config['server_path'])349        source.get_server().logger.info('[QBM] Restore backup ' + slot_folder)350        copy_worlds(slot_folder, config['server_path'])351        source.get_server().start()352    finally:353        restoring_backup_lock.release()354def trigger_abort(source):355    global abort_restore, slot_selected356    abort_restore = True357    slot_selected = None358    print_message(source, 'ç»æ¢æä½ï¼', tell=False)359@new_thread('QBM')360def list_backup(source: CommandSource, size_display=config['size_display']):361    def get_dir_size(dir):362        size = 0363        for root, dirs, files in os.walk(dir):364            size += sum([os.path.getsize(os.path.join(root, name))365                         for name in files])366        return size367    def format_dir_size(size):368        if size < 2 ** 30:369            return f'{round(size / 2 ** 20, 2)} MB'370        else:371            return f'{round(size / 2 ** 30, 2)} GB'372    print_message(source, '§dãæ§½ä½ä¿¡æ¯ã§r', prefix='')373    backup_size = 0374    for i in range(get_slot_count()):375        slot_idx = i + 1376        slot_info = format_slot_info(slot_number=slot_idx)377        if size_display:378            dir_size = get_dir_size(get_slot_folder(slot_idx))379        else:380            dir_size = 0381        backup_size += dir_size382        # noinspection PyTypeChecker383        text = RTextList(384            RText('[æ§½ä½Â§6{}§r] '.format(slot_idx)).h(385                'åæ¡£ä¿æ¤æ¶é¿: ' + format_protection_time(config['slots'][slot_idx - 1]['delete_protection']))386        )387        if slot_info is not None:388            text += RTextList(389                RText('[â·] ', color=RColor.green).h(f'ç¹å»åæ¡£è³æ§½ä½Â§6{slot_idx}§r').c(390                    RAction.run_command, f'{Prefix} back {slot_idx}'),391                RText('[Ã] ', color=RColor.red).h(f'ç¹å»å é¤æ§½ä½Â§6{slot_idx}§r').c(392                    RAction.suggest_command, f'{Prefix} del {slot_idx}')393            )394            if size_display:395                text += '§2{}§r '.format(format_dir_size(dir_size))396        text += slot_info397        print_message(source, text, prefix='')398    if size_display:399        print_message(source, 'å¤ä»½æ»å ç¨ç©ºé´: §a{}§r'.format(400            format_dir_size(backup_size)), prefix='')401@new_thread('QBM')402def print_help_message(source: CommandSource):403    if source.is_player:404        source.reply('')405    for line in HelpMessage.splitlines():406        prefix = re.search(r'(?<=§7){}[\w ]*(?=§)'.format(Prefix), line)407        if prefix is not None:408            print_message(source, RText(line).set_click_event(409                RAction.suggest_command, prefix.group()), prefix='')410        else:411            print_message(source, line, prefix='')412    list_backup(source, size_display=False).join()413    print_message(414        source,415        '§dãå¿«æ·æä½ã§r' + '\n' +416        RText('>>> §aç¹æå建ä¸ä¸ªå¤ä»½Â§r <<<')417        .h('è®°å¾ä¿®æ¹æ³¨é')418        .c(RAction.suggest_command, f'{Prefix} make ææ¯ä¸ä¸ªæ³¨é') + '\n' +419        RText('>>> §cç¹æåæ¡£è³æè¿çå¤ä»½Â§r <<<')420        .h('ä¹å°±æ¯åæ¡£è³ç¬¬ä¸ä¸ªæ§½ä½')421        .c(RAction.suggest_command, f'{Prefix} back'),422        prefix=''423    )424def on_info(server, info: Info):425    if not info.is_user:426        if info.content == 'Saved the game':427            global game_saved428            game_saved = True429def print_unknown_argument_message(source: CommandSource, error: UnknownArgument):430    print_message(source, command_run(431        'åæ°é误ï¼è¯·è¾å
¥Â§7{}§r以è·åæä»¶ä¿¡æ¯'.format(Prefix),432        'ç¹å»æ¥ç帮å©',433        Prefix434    ))435def register_command(server: ServerInterface):436    def get_literal_node(literal):437        lvl = config['minimum_permission_level'].get(literal, 0)438        return Literal(literal).requires(lambda src: src.has_permission(lvl), failure_message_getter=lambda: 'æéä¸è¶³')439    def get_slot_node():440        return Integer('slot').requires(lambda src, ctx: 1 <= ctx['slot'] <= get_slot_count(), failure_message_getter=lambda: 'éè¯¯çæ§½ä½åºå·')441    server.register_command(442        Literal(Prefix).443        runs(print_help_message).444        on_error(UnknownArgument, print_unknown_argument_message, handled=True).445        then(446            get_literal_node('make').447            runs(lambda src: create_backup(src, None)).448            then(GreedyText('comment').runs(lambda src,449                                            ctx: create_backup(src, ctx['comment'])))450        ).451        then(452            get_literal_node('back').453            runs(lambda src: restore_backup(src, 1)).454            then(get_slot_node().runs(lambda src,455                                      ctx: restore_backup(src, ctx['slot'])))456        ).457        then(458            get_literal_node('del').459            then(get_slot_node().runs(lambda src,460                                      ctx: delete_backup(src, ctx['slot'])))461        ).462        then(get_literal_node('confirm').runs(confirm_restore)).463        then(get_literal_node('abort').runs(trigger_abort)).464        then(get_literal_node('list').runs(lambda src: list_backup(src))).465        then(get_literal_node('reload').runs(466            lambda src: load_config(src.get_server(), src)))467    )468def load_config(server, source: CommandSource or None = None):469    global config470    try:471        config = {}472        with open(CONFIG_FILE) as file:473            js = json.load(file)474        for key in default_config.keys():475            config[key] = js[key]476        server.logger.info('Config file loaded')477        if source is not None:478            print_message(source, 'é
ç½®æä»¶å è½½æå', tell=True)479        # delete_protection check480        last = 0481        for i in range(get_slot_count()):482            # noinspection PyTypeChecker483            this = config['slots'][i]['delete_protection']484            if this < 0:485                server.logger.warning(486                    'Slot {} has a negative delete protection time'.format(i + 1))487            elif not last <= this:488                server.logger.warning(489                    'Slot {} has a delete protection time smaller than the former one'.format(i + 1))490            last = this491    except:492        server.logger.info('Fail to read config file, using default value')493        if source is not None:494            print_message(source, 'é
ç½®æä»¶å è½½å¤±è´¥ï¼ä½¿ç¨é»è®¤é
ç½®', tell=True)495        config = default_config496        with open(CONFIG_FILE, 'w') as file:497            json.dump(config, file, indent=4)498def on_load(server: ServerInterface, old):499    global creating_backup_lock, restoring_backup_lock500    if hasattr(old, 'creating_backup_lock') and type(old.creating_backup_lock) == type(creating_backup_lock):501        creating_backup_lock = old.creating_backup_lock502    if hasattr(old, 'restoring_backup_lock') and type(old.restoring_backup_lock) == type(restoring_backup_lock):503        restoring_backup_lock = old.restoring_backup_lock504    load_config(server)505    server.register_help_message(Prefix, command_run(506        '§aå¤ä»½Â§r/§cåæ¡£Â§rï¼Â§6{}§ræ§½ä½'.format(get_slot_count()), 'ç¹å»æ¥ç帮å©ä¿¡æ¯', Prefix))507    register_command(server)508def on_unload(server):509    global abort_restore, plugin_unloaded510    abort_restore = True...test_pass_template.py
Source:test_pass_template.py  
...123    assert pass_slots_template.find_slot('ColorInputOutput') == 1124            125def test_PassTemplate_InsertSlot_AtBegining_Success(pass_slots_template, new_slot):126    template = pass_slots_template127    slot_count = template.get_slot_count()128    depth_stencil_slot = template.find_slot('DepthStencil')129    template.insert_slot(0, new_slot)130    assert template.find_slot(new_slot['Name']) == 0131    assert template.find_slot('DepthStencil') == depth_stencil_slot+1 # DepthStencil moved back by 1132    assert template.get_slot_count() == slot_count+1133        134    # verify the change is saved135    template.save()136    saved_tamplate = PassTemplate(template.file_path)137    assert saved_tamplate.find_slot(new_slot['Name']) == 0138    assert saved_tamplate.get_slot_count() == slot_count+1139    140def test_PassTemplate_InsertSlot_AtEnd_Success(pass_slots_template, new_slot):141    template = pass_slots_template142    slot_count = template.get_slot_count()143    depth_stencil_slot = template.find_slot('DepthStencil')144    template.insert_slot(slot_count, new_slot)145    assert template.find_slot(new_slot['Name']) == slot_count146    assert template.find_slot('DepthStencil') == depth_stencil_slot147    assert template.get_slot_count() == slot_count+1148        149    # verify the change is saved150    template.save()151    saved_tamplate = PassTemplate(template.file_path)152    assert saved_tamplate.find_slot(new_slot['Name']) == slot_count153    assert saved_tamplate.get_slot_count() == slot_count+1154    155def test_PassTemplate_AddSlot_GoodSlotData_Success(pass_slots_template, new_slot):156    template = pass_slots_template157    slot_count = template.get_slot_count()158    depth_stencil_slot = template.find_slot('DepthStencil')159    template.add_slot(new_slot)160    assert template.find_slot(new_slot['Name']) == slot_count161    assert template.find_slot('DepthStencil') == depth_stencil_slot162    assert template.get_slot_count() == slot_count+1163        164    # verify the change is saved165    template.save()166    saved_tamplate = PassTemplate(template.file_path)167    assert saved_tamplate.find_slot(new_slot['Name']) == slot_count168    assert saved_tamplate.get_slot_count() == slot_count+1169    170def test_PassTemplate_InsertSlot_OutOfRange_AppendSuccess(pass_slots_template, new_slot):171    template = pass_slots_template172    slot_count = template.get_slot_count()173    template.insert_slot(slot_count+3, new_slot)174    assert template.find_slot(new_slot['Name']) == slot_count175    assert template.get_slot_count() == slot_count+1176        177def test_PassTemplate_AddDuplicateSlot_ExceptionThrown(pass_slots_template, new_slot):178    template = pass_slots_template179    slot_count = template.get_slot_count()180    template.add_slot(new_slot)181    182    with pytest.raises(ValueError):183        template.insert_slot(0, new_slot)184    with pytest.raises(ValueError):185        template.add_slot(new_slot)186        187def test_PassTemplate_InsertOrAddSlot_WithBadSlotData_ExceptionThrown(pass_slots_template):188    template = pass_slots_template189    slot_count = template.get_slot_count()190    bad_slot = json.loads('{\"slot\": \"xxx\"}')191    192    with pytest.raises(KeyError):193        template.insert_slot(0, bad_slot)194    with pytest.raises(KeyError):195        template.add_slot(bad_slot)196        197def test_PassReqeuest_Initialize_WithExistPassReqeuestFromPassTemplate_Success(pass_requests_template):198    template = pass_requests_template199    request = PassRequest(template.get_pass_request('OpaquePass'))200    connection_count = request.get_connection_count()201    assert connection_count == 2202        203def test_PassTemplate_GetPassRequest_NotExist_ReturnNull(pass_requests_template):204    assert not pass_requests_template.get_pass_request('NotExistPass')205            206def test_PassReqeuest_AddConnection_WithExistingConnections_Success(pass_requests_template, new_connection):207    template = pass_requests_template208    request = PassRequest(template.get_pass_request('OpaquePass'))209    connection_count = request.get_connection_count()210    request.add_connection(new_connection)211    connection_count += 1212    assert request.get_connection_count() == connection_count213    214    # verify changes are saved215    template.save()216    saved_tamplate = PassTemplate(template.file_path)217    saved_request = PassRequest(saved_tamplate.get_pass_request('OpaquePass'))218    assert saved_request.get_connection_count() == connection_count219def test_PassReqeuest_AddConnection_WithNoExistingConnections_Success(pass_requests_template, new_connection):220    template = pass_requests_template221    request = PassRequest(template.get_pass_request('ImGuiPass'))222    assert request.get_connection_count() == 0223    request.add_connection(new_connection)224    assert request.get_connection_count() == 1225    226    # verify changes are saved227    template.save()228    saved_tamplate = PassTemplate(template.file_path)229    saved_request = PassRequest(saved_tamplate.get_pass_request('ImGuiPass'))230    assert saved_request.get_connection_count() == 1231    232def test_PassReqeuest_AddConnection_WithDuplicatedName_ExceptionThrown(pass_requests_template, new_connection):233    template = pass_requests_template234    request = PassRequest(template.get_pass_request('OpaquePass'))235    request.add_connection(new_connection)236    with pytest.raises(ValueError):237        request.add_connection(new_connection)238def test_PassReqeuest_AddConnect_BadConnectionData_ExceptionThrown(pass_requests_template, new_connection):239    template = pass_requests_template240    request = PassRequest(template.get_pass_request('OpaquePass'))241    bad_connection = json.loads('{\"xxx\": \"xxx\"}')242    with pytest.raises(KeyError):243        request.add_connection(bad_connection)244                        245def test_PassTemplate_InsertSlot_ToEmptyList_Success(pass_requests_template, new_slot):246    template = pass_requests_template247    # test insert slot function to pass template which doesn't have any slots248    slot_count = template.get_slot_count()249    assert slot_count == 0250    assert template.find_slot(new_slot['Name'])==-1251    pass_requests_template.insert_slot(0, new_slot)252    assert template.find_slot(new_slot['Name']) == 0253    assert template.get_slot_count() == 1254    # verify changes are saved255    template.save()256    saved_tamplate = PassTemplate(template.file_path)257    assert saved_tamplate.get_slot_count() == 1258        259def test_PassTempalte_InsertPassRequest_ToEmptyList_Success(pass_slots_template, new_pass_request):260    template = pass_slots_template261    # test insert pass function to pass template which doesn't have any pass requests262    pass_count = template.get_pass_count()263    assert pass_count == 0264    template.insert_pass_request(0, new_pass_request)265    assert template.find_pass(new_pass_request['Name']) == 0266    assert template.get_pass_count() == 1267    # verify changes are saved268    template.save()269    saved_tamplate = PassTemplate(template.file_path)270    assert saved_tamplate.get_pass_count() == 1271        ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
