How to use describe_volumes method in localstack

Best Python code snippet using localstack_python

ec2_utils.py

Source:ec2_utils.py Github

copy

Full Screen

...105 return 1106def getEBS_volId(devName):107 if not devName.startswith('/dev'): return None108 iid = getEC2_InstanceId()109 res = ec2client.describe_volumes(110 Filters=[111 {'Name': 'attachment.instance-id', 'Values': [iid]},112 {'Name': 'attachment.device', 'Values': [devName]},113 ],114 )115 if len(res['Volumes'])>0:116 return res['Volumes'][0]['VolumeId']117 return None118def getInstanceTags():119 iid = getEC2_InstanceId()120 res = ec2client.describe_tags(121 Filters=[122 {'Name':'resource-id',123 'Values':[iid]124 }]125 )126 savedTags = []127 if len(res['Tags']) > 1:128 for i in res['Tags']:129 if i['Key'] in ['Name', 'aws:ec2spot:fleet-request-id']:130 continue131 if i['Key'].startswith('aws:'):132 continue133 savedTags.append({'Key':i['Key'],'Value':i['Value']})134 return savedTags135 else:136 return []137def attachEBS(devName, vol):138 iid = getEC2_InstanceId()139 res = ec2client.describe_volumes(140 VolumeIds=[ vol ]141 )142 # Check ready Volume143 while res['Volumes'][0]['State'] != 'available':144 time.sleep(1)145 res = ec2client.describe_volumes(146 VolumeIds=[ vol ]147 )148 try:149 res = ec2client.attach_volume(150 InstanceId = iid,151 VolumeId = vol,152 Device = devName,153 )154 except botocore.exceptions.ClientError as e:155 logging.exception("Exception")156 print("[attach] botocore.exceptions.ClientError")157 print(e.__doc__)158 print(e.message)159 return160 except:161 logging.exception("Caught exception")162 print(e.__doc__)163 print(e.message)164 return165 time.sleep(1)166 res = ec2client.describe_volumes(167 VolumeIds=[ vol ]168 )169 # wait until attached170 d_ct = 0171 while len(res['Volumes'][0]['Attachments']) < 1 and d_ct < 18:172 time.sleep(10)173 try:174 res = ec2client.describe_volumes(175 VolumeIds=[ vol ]176 )177 except botocore.exceptions.ClientError as e:178 logging.exception("Exception")179 print("[attach] desc, botocore.exceptions.ClientError")180 print(e.__doc__)181 print(e.message)182 return183 except:184 logging.exception("Caught exception")185 print(e.__doc__)186 print(e.message)187 return188 d_ct += 1189 print("waiting EBS attachment %s: %d" % (vol,d_ct))190 m = 0191 m_ct = 0192 while m == 0 and m_ct<10:193 m = modifyAttr(iid, devName, vol)194 m_ct += 1195 if m_ct > 1:196 time.sleep(1)197 print("re-try modify attr")198 if res['Volumes'][0]['Attachments'][0]['State'] == 'attached':199 return 1200 return 0201def modifyAttr(iid, devName, vol):202 try:203 ec2client.modify_instance_attribute(204 InstanceId = iid,205 BlockDeviceMappings=[206 {207 'DeviceName': devName,208 'Ebs': {209 'VolumeId': vol,210 'DeleteOnTermination': True,211 }212 },213 ]214 )215 except botocore.exceptions.ClientError as e:216 logging.exception("Exception")217 print("botocore.exceptions.ClientError")218 print(e.__doc__)219 print(e.message)220 return 0221 except:222 logging.exception("Exception")223 print(e.__doc__)224 print(e.message)225 return 0226 return 1227def detachEBS(devName, vol):228 """229 return 1 for success 0 for failure230 """231 print ("Detaching {} {}".format(devName,vol))232 iid = getEC2_InstanceId()233 try:234 ec2client.detach_volume(235 Device = devName,236 Force = True,237 InstanceId=iid,238 VolumeId=vol239 )240 except botocore.exceptions.ClientError as e:241 # already detached?242 logging.exception("Exception")243 print("botocore.exceptions.ClientError")244 print(e.__doc__)245 print(e.message)246 return 0247 except:248 logging.exception("Exception")249 print(e.__doc__)250 print(e.message)251 return 0252 res = ec2client.describe_volumes(253 VolumeIds=[ vol ]254 )255 # Check ready Volume256 ct = 0257 while res['Volumes'][0]['State'] != 'available':258 time.sleep(2)259 print("check detach vol {} in state: {}".format(vol, res['Volumes'][0]['State']))260 res = ec2client.describe_volumes(261 VolumeIds=[ vol ]262 )263 ct += 1264 if ct > 10: break265 return 1266def deleteEBS(vol):267 print("Deleting " + vol)268 try:269 res = ec2client.describe_volumes(270 VolumeIds=[ vol ]271 )272 except botocore.exceptions.ClientError as e:273 logging.warn("Exception")274 print("botocore.exceptions.ClientError")275 print(e.__doc__)276 print(e.message)277 return278 except:279 logging.exception("Exception")280 print(e.__doc__)281 print(e.message)282 return283 # Check ready Volume284 ct = 0285 while res['Volumes'][0]['State'] != 'available':286 time.sleep(2)287 print("Waiting to delete when vol is ready")288 print("check delete vol {} in state: {}".format(vol, res['Volumes'][0]['State']))289 res = ec2client.describe_volumes(290 VolumeIds=[ vol ]291 )292 ct += 1293 if ct > 10: break294 try:295 ec2client.delete_volume(296 VolumeId = vol297 )298 except Exception as e:299 logging.warn("[deleteEBS] Caught exception")300 print(e.__doc__)301 print(e.message)...

Full Screen

Full Screen

test_ebs.py

Source:test_ebs.py Github

copy

Full Screen

1from botocore.exceptions import ClientError2from mock import MagicMock, patch3from spacel.model import SpaceVolume4from spacel.volumes.ebs import EbsAttachment5from test.aws import MockedClientTest, INSTANCE_ID, AVAILABILITY_ZONE6from test.volumes import LABEL, DEVICE7VOLUME_ID = 'vol-111111'8SNAPSHOT_ID = 'snap-123456'9class TestEbsAttachment(MockedClientTest):10 def setUp(self):11 super(TestEbsAttachment, self).setUp()12 self.volume = SpaceVolume(LABEL, {13 'count': 314 })15 self.volume_item = {16 'label': {'S': LABEL},17 'index': {'N': 0}18 }19 self.ebs_attachment = EbsAttachment(self.clients, self.meta)20 def test_attach_existing(self):21 self.volume_item['volume_id'] = {'S': VOLUME_ID}22 self.ec2.describe_volumes.return_value = {'Volumes': [{23 'Attachments': [{24 'InstanceId': INSTANCE_ID,25 'Device': DEVICE26 }]27 }]}28 volume_item = self.ebs_attachment.attach_volume(self.volume,29 self.volume_item)30 self.assertEqual({'S': DEVICE}, volume_item['device'])31 self.ec2.create_snapshot.assert_not_called()32 self.ec2.create_volume.assert_not_called()33 self.ec2.attach_volume.assert_not_called()34 self.ec2.get_waiter.assert_not_called()35 def test_attach_existing_same_az(self):36 self.volume_item['volume_id'] = {'S': VOLUME_ID}37 self.ec2.describe_volumes.return_value = {'Volumes': [{38 'AvailabilityZone': AVAILABILITY_ZONE,39 'Attachments': [{40 'InstanceId': 'i-654321',41 'Device': DEVICE42 }]43 }]}44 self.ebs_attachment._next_volume = MagicMock(return_value=DEVICE)45 self.ebs_attachment._attach_volume = MagicMock()46 self.ebs_attachment.attach_volume(self.volume, self.volume_item)47 self.ec2.create_snapshot.assert_not_called()48 self.ec2.create_volume.assert_not_called()49 self.ec2.attach_volume.assert_not_called()50 self.ec2.get_waiter.assert_called_with('volume_available')51 def test_attach_existing_different_az(self):52 self.volume_item['volume_id'] = {'S': VOLUME_ID}53 self.ec2.describe_volumes.return_value = {'Volumes': [{54 'AvailabilityZone': 'us-west-2a',55 'Attachments': [{56 'InstanceId': 'i-654321',57 'Device': '/dev/xvdb'58 }]59 }]}60 self.ec2.create_snapshot.return_value = {'SnapshotId': SNAPSHOT_ID}61 self.ebs_attachment._next_volume = MagicMock(return_value=DEVICE)62 self.ebs_attachment._attach_volume = MagicMock()63 self.ebs_attachment.attach_volume(self.volume, self.volume_item)64 self.ec2.create_snapshot.assert_called_with(VolumeId=VOLUME_ID)65 self.ec2.create_volume.assert_called_with(66 AvailabilityZone=AVAILABILITY_ZONE,67 Size=4,68 VolumeType='gp2',69 SnapshotId=SNAPSHOT_ID70 )71 self.ec2.attach_volume.assert_not_called()72 self.ec2.delete_volume.assert_called_with(VolumeId=VOLUME_ID)73 def test_attach_create_volume(self):74 self.volume.iops = 40075 self.volume.encrypted = True76 self.volume_item['snapshot_id'] = {'S': SNAPSHOT_ID}77 self.ebs_attachment._next_volume = MagicMock(return_value=DEVICE)78 self.ebs_attachment._attach_volume = MagicMock()79 self.ebs_attachment.attach_volume(self.volume, self.volume_item)80 self.ec2.describe_volumes.assert_not_called()81 self.ec2.create_snapshot.assert_not_called()82 self.ec2.create_volume.assert_called_with(83 Size=4, VolumeType='gp2', AvailabilityZone=AVAILABILITY_ZONE,84 Iops=400, Encrypted=True, SnapshotId=SNAPSHOT_ID)85 def test_describe_volume_error(self):86 client_error = ClientError({'Error': {'Code': 0}}, 'DescribeVolumes')87 self.ec2.describe_volumes.side_effect = client_error88 self.assertRaises(ClientError, self.ebs_attachment._describe_volume,89 VOLUME_ID)90 def test_describe_volume_not_found(self):91 client_error = ClientError({'Error': {92 'Code': 'InvalidVolume.NotFound'}}, 'DescribeVolumes')93 self.ec2.describe_volumes.side_effect = client_error94 volume = self.ebs_attachment._describe_volume(VOLUME_ID)95 self.assertIsNone(volume)96 @patch('spacel.volumes.ebs.check_output')97 def test_next_volume(self, mock_lsblk):98 mock_lsblk.return_value = '''NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT99/dev/xvda 8:0 0 8.0G 0 disk100/dev/xvdb 8:1 0 16.0G 0 disk101'''102 next_device = self.ebs_attachment._next_volume()103 self.assertEqual('/dev/xvdc', next_device)104 @patch('spacel.volumes.ebs.check_output')105 def test_next_volume_none_available(self, mock_lsblk):106 devices = ['/dev/xvd%s' % chr(ord('a') + dev) for dev in range(26)]107 mock_lsblk.return_value = '\n'.join(devices)108 next_device = self.ebs_attachment._next_volume()109 self.assertIsNone(next_device)110 @patch('spacel.volumes.ebs.check_output')111 def test_attach_volume(self, _):112 volume_item = {'volume_id': {'S': VOLUME_ID}}...

Full Screen

Full Screen

attaching_ebs_volume.py

Source:attaching_ebs_volume.py Github

copy

Full Screen

...11 except Exception as e:12 print(e)131415def describe_volumes():16 try:17 print ("Describing EC2 instance")18 resource_ec2 = boto3.client("ec2")19 print(resource_ec2.describe_volumes()["Volumes"][1]["VolumeId"])20 return str(resource_ec2.describe_volumes()["Volumes"][1]["VolumeId"])21 except Exception as e:22 print(e)232425x=describe_ec2_instance()26print(x)27y = describe_volumes()28print(y)293031def attach_ebs_volume_to_instance():32 try:33 print ("attaching ebs volume")34 ec2_client=boto3.client("ec2")35 ec2_client.attach_volume(36 Device = "/dev/sdf",37 InstanceId = x,38 VolumeId = y39 )40 except Exception as e:41 print(e) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful