How to use get_slot_count method in avocado

Best Python code snippet using avocado_python

QuickBackupM.py

Source:QuickBackupM.py Github

copy

Full Screen

...101 os.path.realpath(os.path.join(dst, world)), ignore=filter_ignore)102def remove_worlds(folder):103 for world in config['world_names']:104 shutil.rmtree(os.path.realpath(os.path.join(folder, world)))105def get_slot_count():106 return len(config['slots'])107def get_slot_folder(slot):108 return os.path.join(config['backup_path'], f"slot{slot}")109def get_slot_info(slot):110 """111 :param int slot: the index of the slot112 :return: the slot info113 :rtype: dict or None114 """115 try:116 with open(os.path.join(get_slot_folder(slot), 'info.json')) as f:117 info = json.load(f)118 for key in info.keys():119 value = info[key]120 except:121 info = None122 return info123def format_time():124 return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())125def format_protection_time(time_length):126 """127 :rtype: str128 """129 if time_length < 60:130 return '{}秒'.format(time_length)131 elif time_length < 60 * 60:132 return '{}分钟'.format(round(time_length / 60, 2))133 elif time_length < 24 * 60 * 60:134 return '{}小时'.format(round(time_length / 60 / 60, 2))135 else:136 return '{}天'.format(round(time_length / 60 / 60 / 24, 2))137def format_slot_info(info_dict=None, slot_number=None):138 if type(info_dict) is dict:139 info = info_dict140 elif type(slot_number) is not None:141 info = get_slot_info(slot_number)142 else:143 return None144 if info is None:145 return None146 msg = '日期: {}; 注释: {}'.format(info['time'], info.get('comment', '§7空§r'))147 return msg148def touch_backup_folder():149 def mkdir(path):150 if not os.path.exists(path):151 os.mkdir(path)152 mkdir(config['backup_path'])153 for i in range(get_slot_count()):154 mkdir(get_slot_folder(i + 1))155def slot_number_formatter(slot):156 flag_fail = False157 if type(slot) is not int:158 try:159 slot = int(slot)160 except ValueError:161 flag_fail = True162 if flag_fail or not 1 <= slot <= get_slot_count():163 return None164 return slot165def slot_check(source, slot):166 slot = slot_number_formatter(slot)167 if slot is None:168 print_message(source, '槽位输入错误,应输入一个位于[{}, {}]的数字'.format(169 1, get_slot_count()))170 return None171 slot_info = get_slot_info(slot)172 if slot_info is None:173 print_message(source, '槽位输入错误,槽位§6{}§r为空'.format(slot))174 return None175 return slot, slot_info176def delete_backup(source, slot):177 global creating_backup_lock, restoring_backup_lock178 if creating_backup_lock.locked() or restoring_backup_lock.locked():179 return180 if slot_check(source, slot) is None:181 return182 try:183 shutil.rmtree(get_slot_folder(slot))184 except Exception as e:185 print_message(source, '§4删除槽位§6{}§r失败§r,错误代码:{}'.format(186 slot, e), tell=False)187 else:188 print_message(source, '§a删除槽位§6{}§r完成§r'.format(slot), tell=False)189def clean_up_slot_1():190 """191 try to cleanup slot 1 for backup192 :rtype: bool193 """194 slots = []195 empty_slot_idx = None196 target_slot_idx = None197 max_available_idx = None198 for i in range(get_slot_count()):199 slot_idx = i + 1200 slot = get_slot_info(slot_idx)201 slots.append(slot)202 if slot is None:203 if empty_slot_idx is None:204 empty_slot_idx = slot_idx205 else:206 time_stamp = slot.get('time_stamp', None)207 if time_stamp is not None:208 slot_config_data = config['slots'][slot_idx - 1] # type: dict209 if time.time() - time_stamp > slot_config_data['delete_protection']:210 max_available_idx = slot_idx211 else:212 # old format, treat it as available213 max_available_idx = slot_idx214 if empty_slot_idx is not None:215 target_slot_idx = empty_slot_idx216 else:217 target_slot_idx = max_available_idx218 if target_slot_idx is not None:219 folder = get_slot_folder(target_slot_idx)220 if os.path.isdir(folder):221 shutil.rmtree(folder)222 for i in reversed(range(1, target_slot_idx)): # n-1, n-2, ..., 1223 os.rename(get_slot_folder(i), get_slot_folder(i + 1))224 return True225 else:226 return False227@new_thread('QBM')228def create_backup(source: CommandSource, comment):229 global restoring_backup_lock, creating_backup_lock230 if restoring_backup_lock.locked():231 print_message(source, '正在§c回档§r中,请不要尝试备份', tell=False)232 return233 acquired = creating_backup_lock.acquire(blocking=False)234 if not acquired:235 print_message(source, '正在§a备份§r中,请不要重复输入', tell=False)236 return237 try:238 print_message(source, '§a备份§r中...请稍等', tell=False)239 start_time = time.time()240 touch_backup_folder()241 # start backup242 global game_saved, plugin_unloaded243 game_saved = False244 if config['turn_off_auto_save']:245 source.get_server().execute('save-off')246 source.get_server().execute('save-all flush')247 while True:248 time.sleep(0.01)249 if game_saved:250 break251 if plugin_unloaded:252 print_message(source, '插件重载,§a备份§r中断!', tell=False)253 return254 if not clean_up_slot_1():255 print_message(source, '未找到可用槽位,§a备份§r中断!', tell=False)256 return257 slot_path = get_slot_folder(1)258 # copy worlds to backup slot259 copy_worlds(config['server_path'], slot_path)260 # create info.json261 slot_info = {262 'time': format_time(),263 'time_stamp': time.time()264 }265 if comment is not None:266 slot_info['comment'] = comment267 with open(os.path.join(slot_path, 'info.json'), 'w') as f:268 json.dump(slot_info, f, indent=4)269 # done270 end_time = time.time()271 print_message(source, '§a备份§r完成,耗时§6{}§r秒'.format(272 round(end_time - start_time, 1)), tell=False)273 print_message(source, format_slot_info(274 info_dict=slot_info), tell=False)275 source.get_server().dispatch_event(LiteralEvent(276 'qbm.backup_done'), (source, )) # just for showcase277 except Exception as e:278 print_message(source, '§a备份§r失败,错误代码{}'.format(e), tell=False)279 finally:280 creating_backup_lock.release()281 if config['turn_off_auto_save']:282 source.get_server().execute('save-on')283@new_thread('QBM')284def restore_backup(source: CommandSource, slot):285 ret = slot_check(source, slot)286 if ret is None:287 return288 else:289 slot, slot_info = ret290 global slot_selected, abort_restore291 slot_selected = slot292 abort_restore = False293 print_message(source, '准备将存档恢复至槽位§6{}§r, {}'.format(294 slot, format_slot_info(info_dict=slot_info)), tell=False)295 print_message(296 source,297 command_run('使用§7{0} confirm§r 确认§c回档§r'.format(298 Prefix), '点击确认', '{0} confirm'.format(Prefix))299 + ', '300 + command_run('§7{0} abort§r 取消'.format(Prefix), '点击取消', '{0} abort'.format(Prefix)), tell=False301 )302@new_thread('QBM')303def confirm_restore(source: CommandSource):304 global restoring_backup_lock, creating_backup_lock305 if creating_backup_lock.locked():306 print_message(source, '正在§a备份§r中,请不要尝试回档', tell=False)307 return308 acquired = restoring_backup_lock.acquire(blocking=False)309 if not acquired:310 print_message(source, '正在准备§c回档§r中,请不要重复输入', tell=False)311 return312 try:313 global slot_selected314 if slot_selected is None:315 print_message(source, '没有什么需要确认的', tell=False)316 return317 slot = slot_selected318 slot_selected = None319 print_message(source, '10秒后关闭服务器§c回档§r', tell=False)320 for countdown in range(1, 10):321 print_message(source, command_run(322 '还有{}秒,将§c回档§r为槽位§6{}§r,{}'.format(323 10 - countdown, slot, format_slot_info(slot_number=slot)),324 '点击终止回档!',325 '{} abort'.format(Prefix)326 ), tell=False)327 for i in range(10):328 time.sleep(0.1)329 global abort_restore330 if abort_restore:331 print_message(source, '§c回档§r被中断!', tell=False)332 return333 source.get_server().stop()334 source.get_server().logger.info('[QBM] Wait for server to stop')335 source.get_server().wait_for_start()336 source.get_server().logger.info(337 '[QBM] Backup current world to avoid idiot')338 overwrite_backup_path = os.path.join(339 config['backup_path'], config['overwrite_backup_folder'])340 if os.path.exists(overwrite_backup_path):341 shutil.rmtree(overwrite_backup_path)342 copy_worlds(config['server_path'], overwrite_backup_path)343 with open(os.path.join(overwrite_backup_path, 'info.txt'), 'w') as f:344 f.write('Overwrite time: {}\n'.format(format_time()))345 f.write('Confirmed by: {}'.format(source))346 slot_folder = get_slot_folder(slot)347 source.get_server().logger.info('[QBM] Deleting world')348 remove_worlds(config['server_path'])349 source.get_server().logger.info('[QBM] Restore backup ' + slot_folder)350 copy_worlds(slot_folder, config['server_path'])351 source.get_server().start()352 finally:353 restoring_backup_lock.release()354def trigger_abort(source):355 global abort_restore, slot_selected356 abort_restore = True357 slot_selected = None358 print_message(source, '终止操作!', tell=False)359@new_thread('QBM')360def list_backup(source: CommandSource, size_display=config['size_display']):361 def get_dir_size(dir):362 size = 0363 for root, dirs, files in os.walk(dir):364 size += sum([os.path.getsize(os.path.join(root, name))365 for name in files])366 return size367 def format_dir_size(size):368 if size < 2 ** 30:369 return f'{round(size / 2 ** 20, 2)} MB'370 else:371 return f'{round(size / 2 ** 30, 2)} GB'372 print_message(source, '§d【槽位信息】§r', prefix='')373 backup_size = 0374 for i in range(get_slot_count()):375 slot_idx = i + 1376 slot_info = format_slot_info(slot_number=slot_idx)377 if size_display:378 dir_size = get_dir_size(get_slot_folder(slot_idx))379 else:380 dir_size = 0381 backup_size += dir_size382 # noinspection PyTypeChecker383 text = RTextList(384 RText('[槽位§6{}§r] '.format(slot_idx)).h(385 '存档保护时长: ' + format_protection_time(config['slots'][slot_idx - 1]['delete_protection']))386 )387 if slot_info is not None:388 text += RTextList(389 RText('[▷] ', color=RColor.green).h(f'点击回档至槽位§6{slot_idx}§r').c(390 RAction.run_command, f'{Prefix} back {slot_idx}'),391 RText('[×] ', color=RColor.red).h(f'点击删除槽位§6{slot_idx}§r').c(392 RAction.suggest_command, f'{Prefix} del {slot_idx}')393 )394 if size_display:395 text += '§2{}§r '.format(format_dir_size(dir_size))396 text += slot_info397 print_message(source, text, prefix='')398 if size_display:399 print_message(source, '备份总占用空间: §a{}§r'.format(400 format_dir_size(backup_size)), prefix='')401@new_thread('QBM')402def print_help_message(source: CommandSource):403 if source.is_player:404 source.reply('')405 for line in HelpMessage.splitlines():406 prefix = re.search(r'(?<=§7){}[\w ]*(?=§)'.format(Prefix), line)407 if prefix is not None:408 print_message(source, RText(line).set_click_event(409 RAction.suggest_command, prefix.group()), prefix='')410 else:411 print_message(source, line, prefix='')412 list_backup(source, size_display=False).join()413 print_message(414 source,415 '§d【快捷操作】§r' + '\n' +416 RText('>>> §a点我创建一个备份§r <<<')417 .h('记得修改注释')418 .c(RAction.suggest_command, f'{Prefix} make 我是一个注释') + '\n' +419 RText('>>> §c点我回档至最近的备份§r <<<')420 .h('也就是回档至第一个槽位')421 .c(RAction.suggest_command, f'{Prefix} back'),422 prefix=''423 )424def on_info(server, info: Info):425 if not info.is_user:426 if info.content == 'Saved the game':427 global game_saved428 game_saved = True429def print_unknown_argument_message(source: CommandSource, error: UnknownArgument):430 print_message(source, command_run(431 '参数错误!请输入§7{}§r以获取插件信息'.format(Prefix),432 '点击查看帮助',433 Prefix434 ))435def register_command(server: ServerInterface):436 def get_literal_node(literal):437 lvl = config['minimum_permission_level'].get(literal, 0)438 return Literal(literal).requires(lambda src: src.has_permission(lvl), failure_message_getter=lambda: '权限不足')439 def get_slot_node():440 return Integer('slot').requires(lambda src, ctx: 1 <= ctx['slot'] <= get_slot_count(), failure_message_getter=lambda: '错误的槽位序号')441 server.register_command(442 Literal(Prefix).443 runs(print_help_message).444 on_error(UnknownArgument, print_unknown_argument_message, handled=True).445 then(446 get_literal_node('make').447 runs(lambda src: create_backup(src, None)).448 then(GreedyText('comment').runs(lambda src,449 ctx: create_backup(src, ctx['comment'])))450 ).451 then(452 get_literal_node('back').453 runs(lambda src: restore_backup(src, 1)).454 then(get_slot_node().runs(lambda src,455 ctx: restore_backup(src, ctx['slot'])))456 ).457 then(458 get_literal_node('del').459 then(get_slot_node().runs(lambda src,460 ctx: delete_backup(src, ctx['slot'])))461 ).462 then(get_literal_node('confirm').runs(confirm_restore)).463 then(get_literal_node('abort').runs(trigger_abort)).464 then(get_literal_node('list').runs(lambda src: list_backup(src))).465 then(get_literal_node('reload').runs(466 lambda src: load_config(src.get_server(), src)))467 )468def load_config(server, source: CommandSource or None = None):469 global config470 try:471 config = {}472 with open(CONFIG_FILE) as file:473 js = json.load(file)474 for key in default_config.keys():475 config[key] = js[key]476 server.logger.info('Config file loaded')477 if source is not None:478 print_message(source, '配置文件加载成功', tell=True)479 # delete_protection check480 last = 0481 for i in range(get_slot_count()):482 # noinspection PyTypeChecker483 this = config['slots'][i]['delete_protection']484 if this < 0:485 server.logger.warning(486 'Slot {} has a negative delete protection time'.format(i + 1))487 elif not last <= this:488 server.logger.warning(489 'Slot {} has a delete protection time smaller than the former one'.format(i + 1))490 last = this491 except:492 server.logger.info('Fail to read config file, using default value')493 if source is not None:494 print_message(source, '配置文件加载失败,使用默认配置', tell=True)495 config = default_config496 with open(CONFIG_FILE, 'w') as file:497 json.dump(config, file, indent=4)498def on_load(server: ServerInterface, old):499 global creating_backup_lock, restoring_backup_lock500 if hasattr(old, 'creating_backup_lock') and type(old.creating_backup_lock) == type(creating_backup_lock):501 creating_backup_lock = old.creating_backup_lock502 if hasattr(old, 'restoring_backup_lock') and type(old.restoring_backup_lock) == type(restoring_backup_lock):503 restoring_backup_lock = old.restoring_backup_lock504 load_config(server)505 server.register_help_message(Prefix, command_run(506 '§a备份§r/§c回档§r,§6{}§r槽位'.format(get_slot_count()), '点击查看帮助信息', Prefix))507 register_command(server)508def on_unload(server):509 global abort_restore, plugin_unloaded510 abort_restore = True...

Full Screen

Full Screen

test_pass_template.py

Source:test_pass_template.py Github

copy

Full Screen

...123 assert pass_slots_template.find_slot('ColorInputOutput') == 1124 125def test_PassTemplate_InsertSlot_AtBegining_Success(pass_slots_template, new_slot):126 template = pass_slots_template127 slot_count = template.get_slot_count()128 depth_stencil_slot = template.find_slot('DepthStencil')129 template.insert_slot(0, new_slot)130 assert template.find_slot(new_slot['Name']) == 0131 assert template.find_slot('DepthStencil') == depth_stencil_slot+1 # DepthStencil moved back by 1132 assert template.get_slot_count() == slot_count+1133 134 # verify the change is saved135 template.save()136 saved_tamplate = PassTemplate(template.file_path)137 assert saved_tamplate.find_slot(new_slot['Name']) == 0138 assert saved_tamplate.get_slot_count() == slot_count+1139 140def test_PassTemplate_InsertSlot_AtEnd_Success(pass_slots_template, new_slot):141 template = pass_slots_template142 slot_count = template.get_slot_count()143 depth_stencil_slot = template.find_slot('DepthStencil')144 template.insert_slot(slot_count, new_slot)145 assert template.find_slot(new_slot['Name']) == slot_count146 assert template.find_slot('DepthStencil') == depth_stencil_slot147 assert template.get_slot_count() == slot_count+1148 149 # verify the change is saved150 template.save()151 saved_tamplate = PassTemplate(template.file_path)152 assert saved_tamplate.find_slot(new_slot['Name']) == slot_count153 assert saved_tamplate.get_slot_count() == slot_count+1154 155def test_PassTemplate_AddSlot_GoodSlotData_Success(pass_slots_template, new_slot):156 template = pass_slots_template157 slot_count = template.get_slot_count()158 depth_stencil_slot = template.find_slot('DepthStencil')159 template.add_slot(new_slot)160 assert template.find_slot(new_slot['Name']) == slot_count161 assert template.find_slot('DepthStencil') == depth_stencil_slot162 assert template.get_slot_count() == slot_count+1163 164 # verify the change is saved165 template.save()166 saved_tamplate = PassTemplate(template.file_path)167 assert saved_tamplate.find_slot(new_slot['Name']) == slot_count168 assert saved_tamplate.get_slot_count() == slot_count+1169 170def test_PassTemplate_InsertSlot_OutOfRange_AppendSuccess(pass_slots_template, new_slot):171 template = pass_slots_template172 slot_count = template.get_slot_count()173 template.insert_slot(slot_count+3, new_slot)174 assert template.find_slot(new_slot['Name']) == slot_count175 assert template.get_slot_count() == slot_count+1176 177def test_PassTemplate_AddDuplicateSlot_ExceptionThrown(pass_slots_template, new_slot):178 template = pass_slots_template179 slot_count = template.get_slot_count()180 template.add_slot(new_slot)181 182 with pytest.raises(ValueError):183 template.insert_slot(0, new_slot)184 with pytest.raises(ValueError):185 template.add_slot(new_slot)186 187def test_PassTemplate_InsertOrAddSlot_WithBadSlotData_ExceptionThrown(pass_slots_template):188 template = pass_slots_template189 slot_count = template.get_slot_count()190 bad_slot = json.loads('{\"slot\": \"xxx\"}')191 192 with pytest.raises(KeyError):193 template.insert_slot(0, bad_slot)194 with pytest.raises(KeyError):195 template.add_slot(bad_slot)196 197def test_PassReqeuest_Initialize_WithExistPassReqeuestFromPassTemplate_Success(pass_requests_template):198 template = pass_requests_template199 request = PassRequest(template.get_pass_request('OpaquePass'))200 connection_count = request.get_connection_count()201 assert connection_count == 2202 203def test_PassTemplate_GetPassRequest_NotExist_ReturnNull(pass_requests_template):204 assert not pass_requests_template.get_pass_request('NotExistPass')205 206def test_PassReqeuest_AddConnection_WithExistingConnections_Success(pass_requests_template, new_connection):207 template = pass_requests_template208 request = PassRequest(template.get_pass_request('OpaquePass'))209 connection_count = request.get_connection_count()210 request.add_connection(new_connection)211 connection_count += 1212 assert request.get_connection_count() == connection_count213 214 # verify changes are saved215 template.save()216 saved_tamplate = PassTemplate(template.file_path)217 saved_request = PassRequest(saved_tamplate.get_pass_request('OpaquePass'))218 assert saved_request.get_connection_count() == connection_count219def test_PassReqeuest_AddConnection_WithNoExistingConnections_Success(pass_requests_template, new_connection):220 template = pass_requests_template221 request = PassRequest(template.get_pass_request('ImGuiPass'))222 assert request.get_connection_count() == 0223 request.add_connection(new_connection)224 assert request.get_connection_count() == 1225 226 # verify changes are saved227 template.save()228 saved_tamplate = PassTemplate(template.file_path)229 saved_request = PassRequest(saved_tamplate.get_pass_request('ImGuiPass'))230 assert saved_request.get_connection_count() == 1231 232def test_PassReqeuest_AddConnection_WithDuplicatedName_ExceptionThrown(pass_requests_template, new_connection):233 template = pass_requests_template234 request = PassRequest(template.get_pass_request('OpaquePass'))235 request.add_connection(new_connection)236 with pytest.raises(ValueError):237 request.add_connection(new_connection)238def test_PassReqeuest_AddConnect_BadConnectionData_ExceptionThrown(pass_requests_template, new_connection):239 template = pass_requests_template240 request = PassRequest(template.get_pass_request('OpaquePass'))241 bad_connection = json.loads('{\"xxx\": \"xxx\"}')242 with pytest.raises(KeyError):243 request.add_connection(bad_connection)244 245def test_PassTemplate_InsertSlot_ToEmptyList_Success(pass_requests_template, new_slot):246 template = pass_requests_template247 # test insert slot function to pass template which doesn't have any slots248 slot_count = template.get_slot_count()249 assert slot_count == 0250 assert template.find_slot(new_slot['Name'])==-1251 pass_requests_template.insert_slot(0, new_slot)252 assert template.find_slot(new_slot['Name']) == 0253 assert template.get_slot_count() == 1254 # verify changes are saved255 template.save()256 saved_tamplate = PassTemplate(template.file_path)257 assert saved_tamplate.get_slot_count() == 1258 259def test_PassTempalte_InsertPassRequest_ToEmptyList_Success(pass_slots_template, new_pass_request):260 template = pass_slots_template261 # test insert pass function to pass template which doesn't have any pass requests262 pass_count = template.get_pass_count()263 assert pass_count == 0264 template.insert_pass_request(0, new_pass_request)265 assert template.find_pass(new_pass_request['Name']) == 0266 assert template.get_pass_count() == 1267 # verify changes are saved268 template.save()269 saved_tamplate = PassTemplate(template.file_path)270 assert saved_tamplate.get_pass_count() == 1271 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful