How to use copy_snapshot method in localstack

Best Python code snippet using localstack_python

test_action.py

Source:test_action.py Github

copy

Full Screen

...89 return resource_stack90 except Exception as ex:91 cls.logger.test("Error creating stack {}, {}", resource_stack_name, ex)92 return None93 def check_copy_snapshot(self, snapshot_id, source_snapshot, destination):94 ec2_destination = Ec2(region=destination)95 self.logger.test("Checking copied snapshot")96 snapshot_copy = ec2_destination.get_snapshot(snapshot_id)97 self.assertIsNotNone(snapshot_copy, "Snapshot copy does exist")98 snapshot_tags = snapshot_copy.get("Tags", {})99 self.assertTrue(TagFilterExpression("copied-tag=copied-value").is_match(snapshot_tags), "Source snapshot tag copied")100 self.assertFalse(TagFilterExpression("not-copied-tag=*").is_match(snapshot_tags), "Source snapshot tag not copied")101 self.logger.test("[X] Expected source snapshot tag copied")102 snapshot_placeholders = {103 copy_snapshot.TAG_PLACEHOLDER_SOURCE_REGION: source_snapshot["Region"],104 copy_snapshot.TAG_PLACEHOLDER_SOURCE_SNAPSHOT_ID: source_snapshot["SnapshotId"],105 copy_snapshot.TAG_PLACEHOLDER_SOURCE_VOLUME: source_snapshot["VolumeId"],106 copy_snapshot.TAG_PLACEHOLDER_OWNER_ACCOUNT: source_snapshot["AwsAccount"]107 }108 self.assertTrue(testing.tags.verify_placeholder_tags(snapshot_tags, snapshot_placeholders),109 "All placeholder tags set on snapshot {}".format(snapshot_id))110 self.logger.test("[X] Placeholder tags created")111 self.assertTrue(TagFilterExpression(112 "{}={}".format(actions.marker_snapshot_tag_source_source_volume_id(), source_snapshot["VolumeId"])).is_match(113 snapshot_tags), "Source volume tag set")114 self.logger.test("[X] Source volume tag created")115 self.assertTrue(TagFilterExpression(116 "{}={}".format(copy_snapshot.Ec2CopySnapshotAction.marker_tag_source_snapshot_id(),117 source_snapshot["SnapshotId"])).is_match(118 snapshot_tags), "Source snapshot tag set")119 self.logger.test("[X] Source snapshot tag created")120 self.assertEqual(ec2_destination.get_snapshot_create_volume_permission_users(snapshot_id), ["123456789012"],121 "Create volume permissions set")122 self.logger.test("[X] Volume create permission set")123 self.assertEqual(snapshot_copy["Description"], source_snapshot["Description"], "Description is copied")124 self.logger.test("[X] Description is copied")125 def check_source_snapshot(self, snapshot_source__id, snapshot_copy_id, task_name, destination):126 self.logger.test("Checking source snapshot")127 snapshot_tags = self.ec2.get_snapshot_tags(snapshot_source__id)128 snapshot_placeholders = {129 copy_snapshot.TAG_PLACEHOLDER_COPIED_SNAPSHOT_ID: snapshot_copy_id,130 copy_snapshot.TAG_PLACEHOLDER_COPIED_REGION: destination131 }132 self.assertTrue(testing.tags.verify_placeholder_tags(snapshot_tags, snapshot_placeholders),133 "All placeholder tags set on source snapshot {}".format(snapshot_tags))134 self.logger.test("[X] Placeholder tags created on source snapshot")135 copied_tag_value = snapshot_tags.get(copy_snapshot.Ec2CopySnapshotAction.marker_tag_copied_to(task_name))136 self.assertIsNotNone(copied_tag_value, "Marker for snapshot copied by task is set")137 try:138 copied_data = json.loads(copied_tag_value)139 except ValueError:140 copied_data = {}141 self.assertEqual(destination, copied_data.get("region"), "Destination region is set in copied marker tag")142 self.logger.test("[X] Snapshot copied marker tag set and has expected values")143 def cleanup_leftover_source_snapshots(self, test_method):144 self.ec2.delete_snapshots_by_tags(tag_filter_expression="{}={}".format(tasklist_tagname(TESTED_ACTION), test_method))145 def test_copy_unencrypted_snapshot_same_region(self):146 test_method = inspect.stack()[0][3]147 self.cleanup_leftover_source_snapshots(test_method)148 self.logger.test("Creating source snapshot")149 source_snapshot_unencrypted = self.ec2.create_snapshot(self.volume_unencrypted, tags={150 "copied-tag": "copied-value",151 "not-copied-tag": "not-copied-value",152 "Name": "Ec2CopySnapshot_{}".format(test_method),153 tasklist_tagname(TESTED_ACTION): test_method154 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))155 self.snapshots.append(source_snapshot_unencrypted["SnapshotId"])156 testing.tags.set_ec2_tag_to_delete(self.ec2, [source_snapshot_unencrypted["SnapshotId"]])157 parameters = {158 copy_snapshot.PARAM_DESTINATION_REGION: region(),159 copy_snapshot.PARAM_ACCOUNTS_VOLUME_CREATE_PERMISSIONS: ["123456789012"],160 copy_snapshot.PARAM_COPIED_SNAPSHOT_TAGS: "copied-tag",161 copy_snapshot.PARAM_SNAPSHOT_DESCRIPTION: "{{{}}}".format(copy_snapshot.TAG_PLACEHOLDER_SOURCE_DESCRIPTION),162 copy_snapshot.PARAM_SNAPSHOT_TAGS: testing.tags.common_placeholder_tags(163 test_delete=False,164 placeholders=[165 copy_snapshot.TAG_PLACEHOLDER_SOURCE_REGION,166 copy_snapshot.TAG_PLACEHOLDER_SOURCE_SNAPSHOT_ID,167 copy_snapshot.TAG_PLACEHOLDER_SOURCE_VOLUME,168 copy_snapshot.TAG_PLACEHOLDER_OWNER_ACCOUNT]),169 copy_snapshot.PARAM_SOURCE_TAGS: testing.tags.common_placeholder_tags([170 copy_snapshot.TAG_PLACEHOLDER_COPIED_SNAPSHOT_ID,171 copy_snapshot.TAG_PLACEHOLDER_COPIED_REGION172 ]),173 copy_snapshot.PARAM_COPIED_SNAPSHOTS: copy_snapshot.COPIED_OWNED_BY_ACCOUNT,174 copy_snapshot.PARAM_DELETE_AFTER_COPY: False,175 copy_snapshot.PARAM_ENCRYPTED: False176 }177 self.logger.test("Running task")178 self.task_runner.run(parameters,179 task_name=test_method,180 complete_check_polling_interval=10)181 self.assertTrue(self.task_runner.success(), "Task executed successfully")182 snapshot_copy_id = self.task_runner.results[0].result["copy-snapshot-id"]183 self.snapshots.append(snapshot_copy_id)184 self.logger.test("[X] Task completed")185 self.check_copy_snapshot(snapshot_copy_id, source_snapshot_unencrypted, region())186 self.check_source_snapshot(source_snapshot_unencrypted["SnapshotId"], snapshot_copy_id, test_method, region())187 assert (self.task_runner.max_concurrency == 5)188 assert (self.task_runner.concurrency_key == "ec2:CopySnapshot:{}:{}".format(self.task_runner.tested_account,189 self.task_runner.tested_region))190 def test_copy_snapshot_description(self):191 test_method = inspect.stack()[0][3]192 self.cleanup_leftover_source_snapshots(test_method)193 self.logger.test("Creating source snapshot")194 source_snapshot = self.ec2.create_snapshot(self.volume_unencrypted,195 tags={196 "Name": "Ec2CopySnapshot_{}".format(test_method),197 tasklist_tagname(TESTED_ACTION): test_method198 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))199 self.snapshots.append(source_snapshot["SnapshotId"])200 parameters = {201 copy_snapshot.PARAM_DESTINATION_REGION: region(),202 copy_snapshot.PARAM_COPIED_SNAPSHOTS: copy_snapshot.COPIED_OWNED_BY_ACCOUNT,203 copy_snapshot.PARAM_DELETE_AFTER_COPY: False,204 copy_snapshot.PARAM_ENCRYPTED: False205 }206 self.logger.test("Running task")207 self.task_runner.run(parameters,208 task_name=test_method,209 complete_check_polling_interval=10)210 self.assertTrue(self.task_runner.success(), "Task executed successfully")211 snapshot_copy_id = self.task_runner.results[0].result["copy-snapshot-id"]212 self.snapshots.append(snapshot_copy_id)213 self.logger.test("[X] Task completed")214 copied_snapshot = self.ec2.get_snapshot(snapshot_copy_id)215 self.assertEqual(source_snapshot.get("Description"), copied_snapshot.get("Description"), "Description copied as default")216 self.logger.test("[X]Source description copied")217 def test_snapshot_only_copied_once(self):218 test_method = inspect.stack()[0][3]219 self.cleanup_leftover_source_snapshots(test_method)220 self.logger.test("Creating source snapshot")221 source_snapshot = self.ec2.create_snapshot(self.volume_unencrypted, tags={222 "Name": "Ec2CopySnapshot_{}".format(test_method),223 tasklist_tagname(TESTED_ACTION): test_method224 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))225 assert (source_snapshot is not None)226 self.snapshots.append(source_snapshot["SnapshotId"])227 parameters = {228 copy_snapshot.PARAM_DESTINATION_REGION: region(),229 }230 self.logger.test("Running task to copy snapshot")231 self.task_runner.run(parameters,232 task_name=test_method,233 complete_check_polling_interval=10)234 self.assertTrue(self.task_runner.success(), "Task executed successfully")235 snapshot_copy_id = self.task_runner.results[0].result["copy-snapshot-id"]236 self.snapshots.append(snapshot_copy_id)237 self.logger.test("Running task again")238 self.task_runner.run(parameters, task_name=test_method, complete_check_polling_interval=10)239 self.logger.test("[X] Task completed")240 self.assertEqual(0, len(self.task_runner.results), "Snapshot already copied")241 self.logger.test("[X] Snapshot was not copied for second time")242 def test_snapshot_not_longer_available(self):243 def delete_snapshot_after_select(tracker):244 for item in tracker.task_items:245 self.ec2.delete_snapshots(snapshot_ids=[item.get(handlers.TASK_TR_RESOURCES, {}).get("SnapshotId")])246 test_method = inspect.stack()[0][3]247 self.cleanup_leftover_source_snapshots(test_method)248 self.logger.test("Creating source snapshot")249 source_snapshot = self.ec2.create_snapshot(self.volume_unencrypted, tags={250 "Name": "Ec2CopySnapshot_{}".format(test_method),251 tasklist_tagname(TESTED_ACTION): test_method252 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))253 assert (source_snapshot is not None)254 self.snapshots.append(source_snapshot["SnapshotId"])255 parameters = {256 copy_snapshot.PARAM_DESTINATION_REGION: region(),257 }258 self.logger.test("Running task to copy snapshot")259 self.task_runner.run(parameters,260 task_name=test_method,261 complete_check_polling_interval=10, run_after_select=delete_snapshot_after_select)262 self.logger.test("[X] Task completed")263 self.assertEqual(1, len(self.task_runner.results), "Snapshot not longer available")264 self.logger.test("[X] Snapshot was not longer available")265 def test_copy_unencrypted_snapshot_other_region(self):266 test_method = inspect.stack()[0][3]267 self.cleanup_leftover_source_snapshots(test_method)268 destination_ec2 = Ec2(region=remote_region())269 remote_snapshot_copy_id = None270 try:271 self.logger.test("Creating source snapshot")272 source_snapshot_unencrypted = self.ec2.create_snapshot(self.volume_unencrypted, tags={273 "Name": "Ec2CopySnapshot_{}".format(test_method),274 "copied-tag": "copied-value",275 "not-copied-tag": "not-copied-value",276 tasklist_tagname(TESTED_ACTION): test_method277 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))278 self.snapshots.append(source_snapshot_unencrypted["SnapshotId"])279 parameters = {280 copy_snapshot.PARAM_DESTINATION_REGION: remote_region(),281 copy_snapshot.PARAM_ACCOUNTS_VOLUME_CREATE_PERMISSIONS: ["123456789012"],282 copy_snapshot.PARAM_COPIED_SNAPSHOT_TAGS: "copied-tag",283 copy_snapshot.PARAM_SNAPSHOT_DESCRIPTION: "{{{}}}".format(copy_snapshot.TAG_PLACEHOLDER_SOURCE_DESCRIPTION),284 copy_snapshot.PARAM_SNAPSHOT_TAGS: testing.tags.common_placeholder_tags(285 test_delete=False,286 placeholders=[287 copy_snapshot.TAG_PLACEHOLDER_SOURCE_REGION,288 copy_snapshot.TAG_PLACEHOLDER_SOURCE_SNAPSHOT_ID,289 copy_snapshot.TAG_PLACEHOLDER_SOURCE_VOLUME,290 copy_snapshot.TAG_PLACEHOLDER_OWNER_ACCOUNT]),291 copy_snapshot.PARAM_SOURCE_TAGS: testing.tags.common_placeholder_tags([292 copy_snapshot.TAG_PLACEHOLDER_COPIED_SNAPSHOT_ID,293 copy_snapshot.TAG_PLACEHOLDER_COPIED_REGION294 ])295 }296 self.logger.test("Running task")297 self.task_runner.run(parameters,298 task_name=test_method,299 complete_check_polling_interval=10)300 self.assertTrue(self.task_runner.success(), "Task executed successfully")301 remote_snapshot_copy_id = self.task_runner.results[0].result["copy-snapshot-id"]302 self.logger.test("[X] Task completed")303 self.logger.test("Checking snapshot copy")304 snapshot_copy = destination_ec2.get_snapshot(remote_snapshot_copy_id)305 self.assertIsNotNone(snapshot_copy, "Snapshot created in destination region")306 self.check_copy_snapshot(remote_snapshot_copy_id, source_snapshot_unencrypted, remote_region())307 self.check_source_snapshot(source_snapshot_unencrypted["SnapshotId"],308 remote_snapshot_copy_id, test_method,309 remote_region())310 finally:311 if remote_snapshot_copy_id is not None:312 destination_ec2.delete_snapshots([remote_snapshot_copy_id])313 def test_delete_source_snapshot_after_copy(self):314 test_method = inspect.stack()[0][3]315 self.cleanup_leftover_source_snapshots(test_method)316 self.logger.test("Creating source snapshot")317 source_snapshot = self.ec2.create_snapshot(self.volume_unencrypted, tags={318 "Name": "Ec2CopySnapshot_{}".format(test_method),319 tasklist_tagname(TESTED_ACTION): test_method320 }, description="Snapshot for testing Ec2CopySnapshot : {}".format(test_method))...

Full Screen

Full Screen

test_bin_log.py

Source:test_bin_log.py Github

copy

Full Screen

1import logging2import pytest3import json4log = logging.getLogger(__name__)5SERVEROPT = {"cmd":"./lgraph_server -c lgraph_standalone.json --port 7072 --rpc_port 9092 --enable_backup_log true --host 0.0.0.0 --verbose 1 --directory ./testdb",6 "cleanup_dir":["./testdb"]}7SERVEROPT_1 = {"cmd":"./lgraph_server -c lgraph_standalone.json --port 7073 --rpc_port 9093 --enable_backup_log true --host 0.0.0.0 --verbose 1 --directory ./testdb1",8 "cleanup_dir":["./testdb1"]}9IMPORTOPT = {"cmd":"./lgraph_import --online true -c ./data/yago/yago.conf -r http://127.0.0.1:7072 -u admin -p 73@TuGraph",10 "cleanup_dir":["./.import_tmp"]}11CLIENTOPT = {"host":"127.0.0.1:9093", "user":"admin", "password":"73@TuGraph"}12COPYSNAPOPT = {"src" : "./testdb", "dst" : "./testdb1"}13BINLOGOPT = {"cmd" : "./lgraph_binlog -a restore --host 127.0.0.1 --port 9093 -u admin -p 73@TuGraph -f ./testdb/binlog/*",14 "cleanup_dir":[]}15BINLOGOPT_1 = {"cmd" : "./lgraph_binlog -a restore --db_dir ./testdb1 -u admin -p 73@TuGraph -f ./testdb/binlog/*",16 "cleanup_dir":[]}17class TestBinLog():18 @pytest.mark.parametrize("server", [SERVEROPT], indirect=True)19 @pytest.mark.parametrize("importor", [IMPORTOPT], indirect=True)20 @pytest.mark.parametrize("copy_snapshot", [COPYSNAPOPT], indirect=True)21 @pytest.mark.parametrize("server_1", [SERVEROPT_1], indirect=True)22 @pytest.mark.parametrize("client", [CLIENTOPT], indirect=True)23 def test_remote_backup_none_binlog(self, server, importor, copy_snapshot, server_1, client):24 ret = client.callCypher("MATCH (n) RETURN n LIMIT 100", "default")25 assert ret[0]26 res = json.loads(ret[1])27 assert res == None28 @pytest.mark.parametrize("server", [SERVEROPT], indirect=True)29 @pytest.mark.parametrize("importor", [IMPORTOPT], indirect=True)30 @pytest.mark.parametrize("copy_snapshot", [COPYSNAPOPT], indirect=True)31 @pytest.mark.parametrize("server_1", [SERVEROPT_1], indirect=True)32 @pytest.mark.parametrize("backup_binlog", [BINLOGOPT], indirect=True)33 @pytest.mark.parametrize("client", [CLIENTOPT], indirect=True)34 def test_remote_backup_with_binlog(self, server, importor, copy_snapshot, server_1, backup_binlog, client):35 ret = client.callCypher("MATCH (n) RETURN n LIMIT 100", "default")36 assert ret[0]37 res = json.loads(ret[1])38 assert len(res) == 2139 @pytest.mark.parametrize("server", [SERVEROPT], indirect=True)40 @pytest.mark.parametrize("importor", [IMPORTOPT], indirect=True)41 @pytest.mark.parametrize("copy_snapshot", [COPYSNAPOPT], indirect=True)42 @pytest.mark.parametrize("server_1", [SERVEROPT_1], indirect=True)43 @pytest.mark.parametrize("client", [CLIENTOPT], indirect=True)44 def test_local_backup_none_binlog(self, server, importor, copy_snapshot, server_1, client):45 ret = client.callCypher("MATCH (n) RETURN n LIMIT 100", "default")46 assert ret[0]47 res = json.loads(ret[1])48 assert res == None49 @pytest.mark.parametrize("server", [SERVEROPT], indirect=True)50 @pytest.mark.parametrize("importor", [IMPORTOPT], indirect=True)51 @pytest.mark.parametrize("copy_snapshot", [COPYSNAPOPT], indirect=True)52 @pytest.mark.parametrize("backup_binlog", [BINLOGOPT_1], indirect=True)53 @pytest.mark.parametrize("server_1", [SERVEROPT_1], indirect=True)54 @pytest.mark.parametrize("client", [CLIENTOPT], indirect=True)55 def test_local_backup_with_binlog(self, server, importor, copy_snapshot, backup_binlog, server_1, client):56 ret = client.callCypher("MATCH (n) RETURN n LIMIT 100", "default")57 assert ret[0]58 res = json.loads(ret[1])...

Full Screen

Full Screen

existing_vol_encryption.py

Source:existing_vol_encryption.py Github

copy

Full Screen

1# script that creates a snapshot from exsiting unencrypted volume, creates a encrypted copy of the snapshot and creates a encypted volume from the copy2import boto33ec2 = boto3.resource('ec2')4#snapshot creation from the existing unencrypted volume5snapshot = ec2.Snapshot('id')6created_snapshot = ec2.create_snapshot(7 Description='uday-test-snapshot',8 VolumeId='vol-066d9db72f547bbdf',9 TagSpecifications=[10 {11 'ResourceType': 'snapshot',12 'Tags': [13 {14 'Key': 'Name',15 'Value': 'test-snapshot'16 },17 ]18 },19 ]20 21)22print created_snapshot23created_snapshot.wait_until_completed()24#creating the encrypted copy of the snapshot which is created above25copy_snapshot = snapshot.copy(Description='uday-test-copy',26 Encrypted=True,27 KmsKeyId='arn:aws:kms:us-east-1:706839808421:key/ee5956f2-8e2c-4a9b-9876-2eccd219553f',28 SourceRegion='us-east-1',29 SourceSnapshotId=created_snapshot.snapshot_id,30 )31print copy_snapshot32copied_snapshot = ec2.Snapshot(copy_snapshot['SnapshotId'])33copied_snapshot.wait_until_completed()34# volume creation from the encrypted copy which is created above35response = ec2.create_volume(36 AvailabilityZone='us-east-1a',37 Size=10,38 VolumeType='gp2',39 SnapshotId = copy_snapshot['SnapshotId'],40 TagSpecifications=[41 {42 'ResourceType': 'volume',43 'Tags': [44 {45 'Key': 'Name',46 'Value': 'test-volume'47 },48 ]49 },50 ]51)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful