Best Python code snippet using tempest_python
test_volumes.py
Source:test_volumes.py  
...111        self.client.delete_volume(VolumeId=volume_id)112        self.cancelResourceCleanUp(res_clean)113        self.get_volume_waiter().wait_delete(volume_id)114    @testtools.skipUnless(CONF.aws.ebs_image_id, "ebs image id is not defined")115    def test_attach_detach_volume(self):116        clean_dict = {}117        instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,118                                        clean_dict=clean_dict)119        clean_i = clean_dict['instance']120        kwargs = {121            'Size': 1,122            'AvailabilityZone': CONF.aws.aws_zone123        }124        data = self.client.create_volume(*[], **kwargs)125        volume_id = data['VolumeId']126        clean_v = self.addResourceCleanUp(self.client.delete_volume,127                                          VolumeId=volume_id)128        self.get_volume_waiter().wait_available(volume_id)129        kwargs = {130            'Device': '/dev/sdh',131            'InstanceId': instance_id,132            'VolumeId': volume_id,133        }134        self.client.attach_volume(*[], **kwargs)135        clean_vi = self.addResourceCleanUp(self.client.detach_volume,136                                           VolumeId=volume_id)137        self.get_volume_attachment_waiter().wait_available(138            volume_id, final_set=('attached'))139        # reorder cleanups to avoid error on volume delete140        self.cancelResourceCleanUp(clean_i)141        clean_i = self.addResourceCleanUp(self.client.terminate_instances,142                                          InstanceIds=[instance_id])143        data = self.client.describe_volumes(VolumeIds=[volume_id])144        self.assertEqual(1, len(data['Volumes']))145        volume = data['Volumes'][0]146        self.assertEqual('in-use', volume['State'])147        self.assertEqual(1, len(volume['Attachments']))148        attachment = volume['Attachments'][0]149        self.assertFalse(attachment['DeleteOnTermination'])150        self.assertIsNotNone(attachment['Device'])151        self.assertEqual(instance_id, attachment['InstanceId'])152        self.assertEqual(volume_id, attachment['VolumeId'])153        data = self.client.describe_instances(InstanceIds=[instance_id])154        self.assertEqual(1, len(data.get('Reservations', [])))155        self.assertEqual(1, len(data['Reservations'][0].get('Instances', [])))156        bdms = data['Reservations'][0]['Instances'][0]['BlockDeviceMappings']157        self.assertNotEmpty(bdms)158        self.assertIn('DeviceName', bdms[0])159        self.assertIn('Ebs', bdms[0])160        # stop instance to prevent 'busy' state of detached volume161        data = self.client.stop_instances(InstanceIds=[instance_id])162        self.get_instance_waiter().wait_available(instance_id,163                                                  final_set=('stopped'))164        self.client.detach_volume(VolumeId=volume_id)165        self.get_volume_attachment_waiter().wait_delete(volume_id)166        self.cancelResourceCleanUp(clean_vi)167        data = self.client.describe_volumes(VolumeIds=[volume_id])168        self.assertEqual(1, len(data['Volumes']))169        volume = data['Volumes'][0]170        self.assertEqual('available', volume['State'])171        self.assertEqual(0, len(volume['Attachments']))172        self.client.delete_volume(VolumeId=volume_id)173        self.cancelResourceCleanUp(clean_v)174        self.get_volume_waiter().wait_delete(volume_id)175        self.client.terminate_instances(InstanceIds=[instance_id])176        self.cancelResourceCleanUp(clean_i)177        self.get_instance_waiter().wait_delete(instance_id)178    @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")179    def test_attaching_stage(self):180        clean_dict = {}181        instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,182                                        clean_dict=clean_dict)183        clean_i = clean_dict['instance']184        data = self.client.create_volume(185            AvailabilityZone=CONF.aws.aws_zone, Size=1)186        volume_id = data['VolumeId']187        clean_v = self.addResourceCleanUp(self.client.delete_volume,188                                          VolumeId=volume_id)189        self.get_volume_waiter().wait_available(volume_id)190        device_name = '/dev/xvdh'191        kwargs = {192            'Device': device_name,193            'InstanceId': instance_id,194            'VolumeId': volume_id,195        }196        data = self.client.attach_volume(*[], **kwargs)197        clean_vi = self.addResourceCleanUp(self.client.detach_volume,198                                           VolumeId=volume_id)199        self.assertEqual('attaching', data['State'])200        if CONF.aws.run_incompatible_tests:201            bdt = self.get_instance_bdm(instance_id, device_name)202            self.assertIsNotNone(bdt)203            self.assertEqual('attaching', bdt['Ebs']['Status'])204        self.get_volume_attachment_waiter().wait_available(205            volume_id, final_set=('attached'))206        # reorder cleanups to avoid error on volume delete207        self.cancelResourceCleanUp(clean_i)208        clean_i = self.addResourceCleanUp(self.client.terminate_instances,209                                          InstanceIds=[instance_id])210        # stop instance to prevent 'busy' state of detached volume211        data = self.client.stop_instances(InstanceIds=[instance_id])212        self.get_instance_waiter().wait_available(instance_id,213                                                  final_set=('stopped'))214        self.client.detach_volume(VolumeId=volume_id)215        self.get_volume_attachment_waiter().wait_delete(volume_id)216        self.cancelResourceCleanUp(clean_vi)217        self.client.delete_volume(VolumeId=volume_id)218        self.cancelResourceCleanUp(clean_v)219        self.get_volume_waiter().wait_delete(volume_id)220        self.client.terminate_instances(InstanceIds=[instance_id])221        self.cancelResourceCleanUp(clean_i)222        self.get_instance_waiter().wait_delete(instance_id)223    @testtools.skipUnless(CONF.aws.run_incompatible_tests,224                          "Volume statuses are not implemented")225    @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")226    def test_delete_detach_attached_volume(self):227        instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id)228        kwargs = {229            'Size': 1,230            'AvailabilityZone': CONF.aws.aws_zone231        }232        data = self.client.create_volume(*[], **kwargs)233        volume_id = data['VolumeId']234        clean_v = self.addResourceCleanUp(self.client.delete_volume,235                                          VolumeId=volume_id)236        self.get_volume_waiter().wait_available(volume_id)237        kwargs = {238            'Device': '/dev/sdh',239            'InstanceId': instance_id,240            'VolumeId': volume_id,241        }242        self.client.attach_volume(*[], **kwargs)243        clean_vi = self.addResourceCleanUp(self.client.detach_volume,244                                           VolumeId=volume_id)245        self.get_volume_attachment_waiter().wait_available(246            volume_id, final_set=('attached'))247        self.assertRaises('VolumeInUse',248                          self.client.attach_volume,249                          **kwargs)250        kwargs['Device'] = '/dev/sdi'251        self.assertRaises('VolumeInUse',252                          self.client.attach_volume,253                          **kwargs)254        self.assertRaises('VolumeInUse',255                          self.client.delete_volume,256                          VolumeId=volume_id)257        # stop instance to prevent 'busy' state of detached volume258        data = self.client.stop_instances(InstanceIds=[instance_id])259        self.get_instance_waiter().wait_available(instance_id,260                                                  final_set=('stopped'))261        self.client.detach_volume(VolumeId=volume_id)262        self.cancelResourceCleanUp(clean_vi)263        self.get_volume_attachment_waiter().wait_delete(volume_id)264        self.assertRaises('IncorrectState',265                          self.client.detach_volume,266                          VolumeId=volume_id)267        self.client.delete_volume(VolumeId=volume_id)268        self.cancelResourceCleanUp(clean_v)269        self.get_volume_waiter().wait_delete(volume_id)270        self.assertRaises('InvalidVolume.NotFound',271                          self.client.detach_volume,272                          VolumeId=volume_id)273        self.client.terminate_instances(InstanceIds=[instance_id])274        self.get_instance_waiter().wait_delete(instance_id)275    @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")...compute.py
Source:compute.py  
...76            msg = _('Driver attach volume failed: %s') % e77            LOG.error(msg)78            raise webob.exc.HTTPBadRequest(explanation=msg)79        return webob.Response(status_int=202)80    def detach_volume(self, req, id, body):81        context = None82        try:83            volume = self.compute_driver.get(context, id)84        except exception.VolumeNotFound as error:85            raise webob.exc.HTTPNotFound(explanation=error.msg)86        attachment_id = body.get('attachment_id', None)87        self.compute_driver.detach_volume(context, volume, attachment_id)88        return webob.Response(status_int=202)89def create_router(mapper):90    controller = ContainerController()91    mapper.connect('/server/list',92                   controller=controller,93                   action='list',94                   conditions=dict(method=['GET']))95    mapper.connect('/server/attach-volume',96                   controller=controller,97                   action='attach-volume',98                   conditions=dict(method=['POST']))99    mapper.connect('/server/detach_volume',100                   controller=controller,101                   action='detach_volume',...detach_volume.py
Source:detach_volume.py  
...28    def run(self, result):29        """execute the test"""30        if not self.setup_done:31            self.setup()32        status = op_utils.detach_volume(self.server_id, self.volume_id)33        if status:34            result.update({"detach_volume": 1})35            LOG.info("Detach volume from server successful!")36        else:37            result.update({"detach_volume": 0})...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
