How to use describe_backup method in localstack

Best Python code snippet using localstack_python

test_dynamodb.py

Source:test_dynamodb.py Github

copy

Full Screen

...143 resources = p.run()144 self.assertEqual(len(resources), 1)145 client = session_factory().client('dynamodb')146 arn = resources[0]['c7n:BackupArn']147 table = client.describe_backup(148 BackupArn=arn)149 self.assertEqual(table['BackupDescription']['BackupDetails']['BackupName'],150 'Backup-c7n-dynamodb-backup-%s' % (suffix))151 def test_dynamodb_create_prefixed_backup(self):152 dt = datetime.datetime.now().replace(153 year=2018, month=1, day=22, hour=13, minute=42)154 suffix = dt.strftime('%Y-%m-%d-%H-%M')155 session_factory = self.replay_flight_data(156 'test_dynamodb_create_prefixed_backup')157 p = self.load_policy({158 'name': 'c7n-dynamodb-create-prefixed-backup',159 'resource': 'dynamodb-table',160 'filters': [{'TableName': 'c7n-dynamodb-backup'}],161 'actions': [{162 'type': 'backup',163 'prefix': 'custom'}]164 },165 session_factory=session_factory)166 resources = p.run()167 self.assertEqual(len(resources), 1)168 client = session_factory().client('dynamodb')169 arn = resources[0]['c7n:BackupArn']170 table = client.describe_backup(171 BackupArn=arn)172 self.assertEqual(table['BackupDescription']['BackupDetails']['BackupName'],173 'custom-c7n-dynamodb-backup-%s' % (suffix))174 def test_dynamodb_delete_backup(self):175 factory = self.replay_flight_data('test_dynamodb_delete_backup')176 p = self.load_policy({177 'name': 'c7n-dynamodb-delete-backup',178 'resource': 'dynamodb-backup',179 'filters': [{'TableName': 'c7n-dynamodb-backup'}],180 'actions': ['delete']},181 session_factory=factory)182 resources = p.run()183 self.assertEqual(len(resources), 1)184 def test_dynamodb_enable_stream(self):...

Full Screen

Full Screen

redis_backup.py

Source:redis_backup.py Github

copy

Full Screen

...3233pct = ProcessControlThread(download_file_ex, config.backup_process_num, config.backup_thread_num, arg_type="dict", is_join=False, is_retry=True )343536def describe_backup(backup_date, instance, filename, max_try=5):37 38 i=039 while i<max_try:40 i += 141 try:42 all_instances = do.get_all_instances_info(project_ids=[]) 43 break44 except:45 #获取实例信息错误,休眠一段时间后再次尝试46 logger_err.warning(format_exc())47 48 if i >= max_try:49 logger_err.error("get instances info failed")50 exit(1)51 else:52 time.sleep(60)53 54 for region, region_instance_list in all_instances: 55 for l in region_instance_list: 56 if instance and instance not in [l.InstanceId, l.InstanceName]:57 continue58 59 instance_id = l.InstanceId60 instance_name = l.InstanceName61 backup_name = None62 63 if instance_id in config.backup_exclude_instance:64 continue65 66 try:67 backup_info = do.describe_backups(region, instance_id, backup_date)68 backup_id = backup_info.BackupId69 backup_time = backup_info.StartTime70 if backup_id: 71 _filename = filename if filename else "%s_%s.tar" % (instance_name, backup_time.replace("-","").replace(" ","").replace(":","") )72 download_url = None73 timeout = 60074 init_info = {"url":download_url, "path":config.redis_backup_dir, "filename":_filename, "download_timeout":timeout,\75 "region":region, "instance_id":instance_id, "backup_id":backup_id}76 retry_queue.put(init_info)77 78 else:79 raise Exception("null for backup_id") 80 81 except:82 logger_err.error(format_exc())83 logger_err.error("can not get backup name for %s %s %s" % (region, instance_id, backup_date))848586def set_download_queue(timeout = 120):87 """88 设置下载队列89 """90 while True:91 try:92 retry_info = retry_queue.get(block=True, timeout=timeout)93 region = retry_info["region"]94 instance_id = retry_info["instance_id"]95 backup_id = retry_info["backup_id"]96 97 # 防止堆积在下载队列导致url过期98 while download_url_queue.qsize():99 time.sleep(0.1)100 101 try:102 download_url = do.describe_backup_url(region, instance_id, backup_id)103 except:104 logger_err.error(format_exc())105 download_url = None106 107 if not download_url:108 retry_queue.put(retry_info)109 continue110 111 retry_info["url"] = download_url112 download_url_queue.put(retry_info)113 logger.debug("put download info %s" % str(retry_info))114 except Empty:115 if pct.tsize():116 # 后台并发还存在,因此继续117 timeout = 120118 continue119 else:120 # 队列为空,认为没有重试的任务,因此直接结束即可121 logger.debug("get retry queue timeout, seems good to end")122 break 123 124 except:125 logger_err.error(format_exc())126 break127 128 # 设置结束标识符129 for i in range(config.backup_process_num): 130 download_url_queue.put("EOF") 131 132133def main(backup_date, instance, filename):134 135 describe_backup(backup_date, instance, filename) 136 137 #设置下载队列,需要另外开一个进程执行138 t = Thread(target = set_download_queue, args = ())139 t.start() 140 141 pct.start()142 143 t.join()144 pct.join()145146147def arg_parse():148 """149 命令行参数解析 ...

Full Screen

Full Screen

app_management.py

Source:app_management.py Github

copy

Full Screen

1# Copyright 2013 Mirantis Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import routes16from magnetodb.api import with_global_env17from magnetodb.common import wsgi18from magnetodb.api.openstack.v1 import create_resource19from magnetodb.api.openstack.v1.management import create_backup20from magnetodb.api.openstack.v1.management import delete_backup21from magnetodb.api.openstack.v1.management import describe_backup22from magnetodb.api.openstack.v1.management import list_backups23from magnetodb.api.openstack.v1.management import create_restore_job24from magnetodb.api.openstack.v1.management import describe_restore_job25from magnetodb.api.openstack.v1.management import list_restore_jobs26class ManagementApplication(wsgi.Router):27 """API"""28 def __init__(self):29 mapper = routes.Mapper()30 super(ManagementApplication, self).__init__(mapper)31 mapper.connect(32 "create_backup",33 "/{project_id}/{table_name}/backups",34 conditions={'method': 'POST'},35 controller=create_resource(36 create_backup.CreateBackupController()),37 action="process_request"38 )39 mapper.connect(40 "list_backups",41 "/{project_id}/{table_name}/backups",42 conditions={'method': 'GET'},43 controller=create_resource(44 list_backups.ListBackupsController()),45 action="process_request"46 )47 mapper.connect(48 "describe_backup",49 "/{project_id}/{table_name}/backups/{backup_id}",50 conditions={'method': 'GET'},51 controller=create_resource(52 describe_backup.DescribeBackupController()),53 action="process_request"54 )55 mapper.connect(56 "delete_backup",57 "/{project_id}/{table_name}/backups/{backup_id}",58 conditions={'method': 'DELETE'},59 controller=create_resource(60 delete_backup.DeleteBackupController()),61 action="process_request"62 )63 mapper.connect(64 "create_restore_job",65 "/{project_id}/{table_name}/restores",66 conditions={'method': 'POST'},67 controller=create_resource(68 create_restore_job.CreateRestoreJobController()),69 action="process_request"70 )71 mapper.connect(72 "list_restore_jobs",73 "/{project_id}/{table_name}/restores",74 conditions={'method': 'GET'},75 controller=create_resource(76 list_restore_jobs.ListRestoreJobsController()),77 action="process_request"78 )79 mapper.connect(80 "describe_restore_job",81 "/{project_id}/{table_name}/restores/{restore_job_id}",82 conditions={'method': 'GET'},83 controller=create_resource(84 describe_restore_job.DescribeRestoreJobController()),85 action="process_request"86 )87@with_global_env(default_program='management-api')88def app_factory(global_conf, **local_conf):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful