Best Python code snippet using tempest_python
ovirtbackup.py
Source:ovirtbackup.py  
1from __future__ import print_function2import fnmatch3import itertools4import os5import re6import shutil7import sys8import time9import codecs10from xml.dom import minidom11from ovirtsdk.api import API12from ovirtsdk.infrastructure.errors import RequestError13from ovirtsdk.xml import params14class OvirtBackup:15    """Class for export and import Virtual Machine in oVirt/RHEV environment"""16    def __init__(self, url, user, password):17        self.url = url18        self.user = user19        self.password = password20    def print_info(self):21        print(self.url)22        print(self.user)23        print(self.password)24    def connect(self):25        """Connect to oVirt/RHEV API"""26        try:27            self.api = API(url=self.url, username=self.user,28                           password=self.password, insecure='True')29            return self.api30        except RequestError as err:31            print("Error: {} Reason: {}".format(err.status, err.reason))32            exit(1)33    def vm_state(self, vm):34        return self.api.vms.get(vm).get_status().get_state()35    def create_snap(self, desc, vm):36        """Create a snapshot from a virtual machine with params:37            @param desc: Description of Snapshot38            @param vm: Virtual Machine Name39        """40        try:41            snapshot = self.snapshot = self.api.vms.get(vm).snapshots.list()42            for snap in snapshot:43                if snap.description == desc:44                    self.delete_snap(desc=desc, vm=vm)45            self.api.vms.get(vm).snapshots.add(46                params.Snapshot(47                    description=desc,48                    vm=self.api.vms.get(vm),49                    persist_memorystate=False50                )51            )52            self.snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]53            self.__wait_snap(vm, self.snapshot.id)54        except RequestError as err:55            print("CREATE SNAP Error: {} Reason: {}".format(err.status, err.reason))56            raise Exception(12)57    def snapshot_status(self, vm, snap_id):58        snapshot = self.api.vms.get(vm).snapshots.get(id=snap_id)59        if snapshot:60            return True61        else:62            return False63    def __wait_snap(self, vm, id_snap):64        """ Time wait while create a snapshot of a Virtual Machine"""65        spinner = Spinner()66        print("waiting for snapshot to finish... ")67        while self.api.vms.get(vm).snapshots.get(id=id_snap).snapshot_status != "ok":68            spinner.update()69        spinner.clear()70    def __wait(self, vm, action):71        """Time wait while create and export of a Virtual Machine"""72        if action == 0:73            self.action = "creation"74        elif action == 1:75            self.action = "export"76        spinner = Spinner()77        print("waiting for vm {}... ".format(self.action))78        while self.get_vm_status(vm) != 'down':79            spinner.update()80        spinner.clear()81    def delete_snap(self, desc, vm):82        """Delete a snapshot from a virtual machine with params:83            @param desc: Description of Snapshot84            @param vm: Virtual Machine Name85        """86        try:87            snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]88            snapshot.delete()89            spinner = Spinner()90            print("waiting for delete old snapshot... ")91            while self.snapshot_status(vm, snap_id=snapshot.id):92                spinner.update()93            spinner.clear()94        except RequestError as err:95            print("DELETE SNAP Error: {} Reason: {}".format(err.status, err.reason))96            raise Exception(13)97            #exit(13)98    def create_vm_to_export(self, vm, new_name, desc):99        try:100            self.snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]101            self.snapshots = params.Snapshots(snapshot=[params.Snapshot(id=self.snapshot.id)])102            self.cluster = self.api.clusters.get(id=self.api.vms.get(vm).cluster.id)103            self.api.vms.add(104                params.VM(105                    name=new_name, snapshots=self.snapshots,106                    cluster=self.cluster,107                    template=self.api.templates.get(name="Blank"),108                    delete_protected=False109                )110            )111            self.__wait(new_name, 0)112        except RequestError as err:113            print("Error: {} Reason: {}".format(err.status, err.reason))114            raise Exception(14)115            #exit(14)116    def get_storage_domains(self, vm):117        self.datacenter = self.get_dc(vm)118        return self.datacenter.storagedomains.list()119    def get_dc(self, vm):120        """Return Datacenter object121            :param vm: Virtual Machine Name122        """123        self.dc = self.api.datacenters.get(id=self.get_cluster(vm).data_center.id)124        return self.dc125    def get_cluster(self, vm):126        """Return Cluster object127            :param vm: Virtual Machine Name128        """129        self.cluster = self.api.clusters.get(id=self.api.vms.get(vm).cluster.id)130        return self.cluster131    def if_exists_vm(self, vm):132        """Verify if virtual machine and new virtual machine already exists"""133        if self.api.vms.get(vm):134            return 1135        else:136            return 0137    def get_vm_status(self, vm):138        """Verify status of virtual machine"""139        self.state = self.api.vms.get(vm).status.state140        return self.state141    def delete_tmp_vm(self, name):142        try:143            self.api.vms.get(name=name).delete()144            return 1145        except Exception as e:146            print(e.message)147            raise Exception(18)148    def export_vm(self, new_name, export, collapse):149        try:150            if collapse == 'False':151                self.api.vms.get(name=new_name).export(152                    params.Action(153                        storage_domain=export,154                        force=True155                    )156                )157                self.__wait(new_name, 1)158            elif collapse == 'True':159                self.api.vms.get(name=new_name).export(160                    params.Action(161                        storage_domain=export,162                        force=True,163                        discard_snapshots=True164                    )165                )166                self.__wait(new_name, 1)167        except Exception as e:168            print(e.message)169            raise Exception(14)170            #exit(14)171    def clean_export_domain(self, name, export):172        snap_name = name + "-SNAP"173        export_vms = list()174        for vm in self.api.storagedomains.get(export).vms.list():175            if (vm.get_name() == name) or (vm.get_name() == snap_name):176                export_vms.append(vm.get_name())177        try:178            for list_vm in export_vms:179                print("Delete exported virtual machine {} from {} ".format(list_vm, export))180                self.api.storagedomains.get(export).vms.get(list_vm).delete()181            return 1182        except RequestError as err:183            print("Error: {} Reason: {}".format(err.status, err.reason))184            return 0185        # Funciones de manejo de Exports Domains186    def get_export_domain(self, vm):187        """Return Export Domain188            :param vm: Virtual Machine Name189        """190        self.cluster = self.get_cluster(vm)191        self.dc = self.get_dc(vm)192        self.export = None193        for self.sd in self.dc.storagedomains.list():194            if self.sd.get_type() == "export":195                self.export = self.sd196        return self.export197    def verify_valid_export(self, dc_id, export, current):198        if current == export:199            if self.api.datacenters.get(id=dc_id).storagedomains.get(200                    export).get_status().get_state() == "active":201                return 1202            else:203                return 2204        elif current != export:205            return 0206    def detach_export(self, dc_id, export):207        if self.api.datacenters.get(id=dc_id).storagedomains.get(export).delete():208            print("Detach OK")209    def do_export_maintenance(self, dc_id, export):210        self.api.datacenters.get(id=dc_id).storagedomains.get(export).deactivate()211        spinner = Spinner()212        print("waiting for maintenance Storage {}... ".format(export))213        while self.api.datacenters.get(id=dc_id).storagedomains.get(214                export).get_status().get_state() != "maintenance":215            spinner.update()216        spinner.clear()217    def attach_export(self, dc_id, export):218        try:219            if self.api.datacenters.get(id=dc_id).storagedomains.add(self.api.storagedomains.get(export)):220                print("Export Domain was attached successfully")221        except RequestError as err:222            print("Error: {} Reason: {}".format(err.status, err.reason))223            raise Exception(17)224            #exit(17)225    def do_export_up(self, dc_id, export):226        if self.api.datacenters.get(id=dc_id).storagedomains.get(export).activate():227            print('Export Domain was activate successfully')228    def prepare_export(self, dc_id, name_export):229        print("Different Export Attached")230        self.do_export_maintenance(dc_id, name_export)231        self.detach_export(dc_id, name_export)232    def active_export(self, vm, export_name):233        self.export_attached = self.get_export_domain(vm)234        dc = self.get_dc(vm)235        if self.export_attached is not None:236            status_export = self.verify_valid_export(dc.id, export_name, self.export_attached.name)237            if status_export == 1:238                print("Export {} is OK".format(export_name))239            elif status_export == 0:240                self.prepare_export(dc.id, self.export_attached.name)241                self.attach_export(dc.id, export_name)242            elif status_export == 2:243                self.do_export_up(dc.id, export_name)244        elif self.export_attached is None:245            self.attach_export(dc.id, export_name)246# Seccion de funciones para movimiento de archivos247    def create_dirs(self, vm_name, export_path, images, vms):248        try:249            images = images.strip("/")250            vms = vms.strip("/")251            self.new_images_path = os.path.join(export_path, vm_name, images)252            self.new_vms_path = os.path.join(export_path, vm_name, vms)253            os.makedirs(self.new_vms_path)254            os.makedirs(self.new_images_path)255        except OSError as e:256            print(e)257            raise Exception(15)258            #exit(15)259    def mv_data(self, new_name, export, source, destination, stid):260        self.dest = export + new_name + destination261        os.chdir(export + stid + destination) # cambiando a /exportdomain/UUID/{master/vms, images}262        shutil.move(source, self.dest)263    def do_mv(self, vm, export_path, images, vms):264        obj_vm = self.api.vms.get(vm)265        # disks = self.api.vms.get(new_vm).disks.list()266        storage_id = self.get_export_domain(vm)267        disks = obj_vm.disks.list()268        objects = {"Disks": list(), "Vms": list()}269        objects["Vms"].append(obj_vm.id)270        for disk in disks:271            objects["Disks"].append(disk.id)272        old_name = ''273        pattern = '-SNAP'274        if re.search(pattern, vm):275            old_name = vm.split("-SNAP")[0]276        else:277            old_name = vm278        for disk in objects["Disks"]:279            self.mv_data(old_name, export_path, disk, images, storage_id.id)280        for vm_iter in objects["Vms"]:281            self.mv_data(old_name, export_path, vm_iter, vms, storage_id.id)282        # Seccion funciones modificacion del xml283    def get_running_ovf(self, vm, desc, path):284        """Get ovf info from snapshot of original VM"""285        try:286            self.snapshot = self.api.vms.get(vm).snapshots.list(287                all_content=True, description=desc)[0]288            print("Get running ovf definition")289            self.ovf = self.snapshot.get_initialization().get_configuration().get_data()290            complete_path = path + vm291            ovf_path = os.path.join(complete_path, "running-" + self.api.vms.get(vm).id + '.ovf')292            print("Write running ovf file")293            ovfFile = codecs.open(ovf_path, encoding='utf-8', mode='w')294            ovfFile.write(self.ovf)295            ovfFile.close()296            print("Write running ovf file: [ OK ]")297            return ovf_path298        except RequestError as err:299            print("Error: {} Reason: {}".format(err.status, err.reason))300            raise Exception(16)301    def get_vm_export_xml(self, xml_export):302        xml_tag = xml_export.getElementsByTagName("rasd:StorageId")303        storage_ids = list()304        for storage_id in xml_tag:305            storage_ids.append(storage_id.firstChild.nodeValue)306        return storage_ids307    def add_storage_id_xml(self, xml_original, xml_export):308        print("Reading xml's definitions")309        with codecs.open(xml_original, "r", "utf-8") as inp:310            xml_doc = minidom.parseString(inp.read().encode("utf-8"))311        with codecs.open(xml_export, "r", "utf-8") as inp2:312            xml_export_obj = minidom.parseString(inp2.read().encode("utf-8"))313        print("Reading xml's definitions: [ OK ]")314        count = 0315        for item in xml_doc.getElementsByTagName("Device"):316            if item.firstChild.nodeValue == "disk":317                st_id = self.get_vm_export_xml(xml_export_obj)318                StorageId = xml_doc.createElement("rasd:StorageId")319                content = xml_doc.createTextNode(st_id[count])320                StorageId.appendChild(content)321                parent = item.parentNode322                parent.appendChild(StorageId)323                count += 1324        return xml_doc325    def save_new_ovf(self, path, name, xml):326        try:327            directory = os.path.splitext(name)[0]328            os.mkdir(path + directory)329            save_name = path + directory + "/" + name330            print("Saving changes..")331            with codecs.open(save_name, 'a', 'utf-8') as new_ovf_file:332                new_ovf_file.write(xml.toxml())333            print("Saving changes: [ OK ]")334        except OSError as e:335            print(e)336    def delete_tmp_ovf(self, path):337        try:338            os.remove(path)339        except OSError as e:340            print(e)341    def export_xml_path(self, path, vm, find_path=None):342        if find_path is not None:343            complete_path = path + vm + find_path344        else:345            complete_path = path + vm346        for root, dirs, filename in os.walk(complete_path):347            for name in filename:348                if fnmatch.fnmatch(name, "*.ovf"):349                    return os.path.join(root, name)350    def change_owner(self, path):351        uid = 36352        gid = 36353        for root, dirs, files in os.walk(path):354            for one_dir in dirs:355                os.chown(os.path.join(root, one_dir), uid, gid)356            for one_file in files:357                os.chown(os.path.join(root, one_file), uid, gid)358    def change_dirname(self, path, vm, timestamp):359        try:360            new_dir = os.path.join(path, vm + "-" + timestamp)361            old_dir = os.path.join(path, vm)362            print("change from {} to {}".format(old_dir, new_dir))363            shutil.move(old_dir, new_dir)364        except OSError as e:365            print(e.errno)366            return e.errno367    def log_event(self, vm, msg, severity):368        # Funcion para manejo de LOG369        try:370            vm_obj = self.api.vms.get(vm)371            self.api.events.add(372                params.Event(373                    vm=vm_obj,374                    origin='vm-backup',375                    description=msg,376                    severity=severity,377                    custom_id=int(time.time())378                )379            )380        except:381            pass382    def clean_dir(self, path, vm):383        pattern = vm + '-\d*|' + vm384        try:385            for f in os.listdir(path):386                if re.search(pattern, f):387                    folder = os.path.join(path, f)388                    if os.path.isdir(folder):389                        shutil.rmtree(folder)390                        self.log_event(vm=vm, msg="delete old backup " + folder, severity='info')391            return 1392        except OSError as err:393            self.log_event(vm=vm, msg=err, severity='error')394            return 0395    def verify_path(self, path):396        try:397            if os.path.isdir(path):398                return 1399            else:400                return 0401        except OSError as err:402            return 0403    def verify_environment(self, path, vm, export):404        if self.verify_path(path=path):405            print("Verify exists path {}: [ OK ]".format(path))406            #path_backup = os.path.join(path, vm)407            storage_list = list()408            for storage in self.api.storagedomains.list():409                storage_list.append(storage.name)410            if export in storage_list:411                print("Verify exists export domain {}: [ OK ]".format(export))412                if self.clean_dir(path=path, vm=vm):413                    print("Delete old backup directory for {} if exist's: [ OK ]".format(vm))414                    return 1415                else:416                    print("Delete old backup for {}: [ FAIL ]".format(vm))417                    return 0418            else:419                print("Verify exists export domain {}: [ FAIL ]".format(export))420                return 0421        else:422            print("Verify exists path {}: [ FAIL ]".format(path))423            return 0424# EXPORT'S LOGIC425    def have_export(self, name):426        vm_cluster = self.api.clusters.get(id=self.api.vms.get(name=name).cluster.id)427        vm_datacenter = self.api.datacenters.get(id=vm_cluster.data_center.id)428        for storage in vm_datacenter.storagedomains.list():429            if storage.get_type() == 'export':430                return storage, vm_datacenter431        return None, vm_datacenter432    def status_export(self, export_obj):433        return export_obj.get_status().get_state()434    def find_export(self, export_name):435        attached_dc = None436        for data_center in self.api.datacenters.list():437            for storage in data_center.storagedomains.list():438                if storage.get_name() == export_name:439                    attached_dc = data_center440                    return attached_dc, storage441        if attached_dc is None:442            for storage in self.api.storagedomains.list():443                if storage.get_type() == 'export':444                    if storage.get_name() == export_name:445                        return attached_dc, storage446                    447    def manage_export(self, name, export):448        vm_export, vm_datacenter = self.have_export(name)449        if vm_export is not None:450            print('VM {} have Export "{}"'.format(name, vm_export.get_name()))451            if self.status_export(export_obj=vm_export) == 'active':452                print('State of domain "{}" active'.format(vm_export.get_name()))453                if vm_export.get_name() == export:454                    print('Everything with Exports [ OK ]')455                if vm_export.get_name() != export:456                    print('Domain "{}" is not BK Domain'.format(vm_export.get_name()))457                    print('Export "{}" do maintenance'.format(vm_export.get_name()))458                    self.do_export_maintenance(dc_id=vm_datacenter.id, export=vm_export.get_name())459                    print('Trying to Detach Export "{}" from {}'.format(vm_export.get_name(), vm_datacenter.get_name()))460                    self.detach_export(dc_id=vm_datacenter.id, export=vm_export.get_name())461                    print('Trying to Locate BK Domain "{}"'.format(export))462                    # Buscando export domain Backup463                    data_center, export_backup = self.find_export(export_name=export)464                    if data_center is None:465                        print('Export "{}" is not attached to any datacenter'.format(export))466                        print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))467                        self.attach_export(dc_id=vm_datacenter.id, export=export)468                    elif data_center is not None:469                        if self.status_export(export_obj=export_backup) == 'active':470                            print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),471                                                                                    data_center.get_name()))472                            print('Export "{}" do maintenance'.format(export_backup.get_name()))473                            self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())474                            print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),475                                                                                data_center.get_name()))476                            self.detach_export(dc_id=data_center.id, export=export_backup.get_name())477                            print(478                                'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),479                                                                            vm_datacenter.get_name()))480                            self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())481                        if self.status_export(export_obj=export_backup) == 'maintenance':482                            print('State of domain "{}" maintenance'.format(export_backup.get_name()))483                            print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),484                                                                                data_center.get_name()))485                            self.detach_export(dc_id=data_center.id, export=export_backup.get_name())486                            print(487                                'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),488                                                                            vm_datacenter.get_name()))489                            self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())490            elif self.status_export(export_obj=vm_export) == 'maintenance':491                print('State of domain "{}" maintenance'.format(vm_export.get_name()))492                if vm_export.get_name() == export:493                    print('Trying to activate Export "{}"'.format(export))494                    self.do_export_up(dc_id=vm_datacenter.id, export=vm_export.get_name())495                if vm_export.get_name() != export:496                    print('Domain "{}" is not BK Domain'.format(vm_export.get_name()))497                    print('Trying to Detach Export "{}" from {}'.format(vm_export.get_name(), vm_datacenter.get_name()))498                    self.detach_export(dc_id=vm_datacenter.id, export=vm_export.get_name())499                    print('Trying to Locate BK Domain "{}"'.format(export))500                    # Buscando export domain Backup501                    data_center, export_backup = self.find_export(export_name=export)502                    if data_center is None:503                        print('Export "{}" is not attached to any datacenter'.format(export))504                        print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))505                        self.attach_export(dc_id=vm_datacenter.id, export=export)506                    elif data_center is not None:507                        if self.status_export(export_obj=export_backup) == 'active':508                            print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),509                                                                                    data_center.get_name()))510                            print('Export "{}" do maintenance'.format(export_backup.get_name()))511                            self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())512                            print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),513                                                                                data_center.get_name()))514                            self.detach_export(dc_id=data_center.id, export=export_backup.get_name())515                            print(516                                'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),517                                                                            vm_datacenter.get_name()))518                            self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())519                        if self.status_export(export_obj=export_backup) == 'maintenance':520                            print('State of domain "{}" maintenance'.format(export_backup.get_name()))521                            print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),522                                                                                data_center.get_name()))523                            self.detach_export(dc_id=data_center.id, export=export_backup.get_name())524                            print(525                                'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),526                                                                            vm_datacenter.get_name()))527                            self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())528        elif vm_export is None:529            # Buscando export domain Backup530            data_center, export_backup = self.find_export(export_name=export)531            if data_center is None:532                print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))533                print('Export "{}" is not attached to any datacenter'.format(export))534                print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))535                self.attach_export(dc_id=vm_datacenter.id, export=export)536            elif data_center is not None:537                if self.status_export(export_obj=export_backup) == 'active':538                    print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))539                    print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),540                                                                            data_center.get_name()))541                    print('Export "{}" do maintenance'.format(export_backup.get_name()))542                    self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())543                    print(544                        'Trying to Detach Export "{}" from {}'.format(export_backup.get_name(), data_center.get_name()))545                    self.detach_export(dc_id=data_center.id, export=export_backup.get_name())546                    print(547                        'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(), vm_datacenter.get_name()))548                    self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())549                if self.status_export(export_obj=export_backup) == 'maintenance':550                    print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))551                    print(552                        'Trying to Detach Export "{}" from {}'.format(export_backup.get_name(), data_center.get_name()))553                    self.detach_export(dc_id=data_center.id, export=export_backup.get_name())554                    print(555                        'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(), vm_datacenter.get_name()))556                    self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())557# disks Logic558    def get_item_tag(self, filename):559        """560        Recibe el nombre de archivo con ruta absoluta561        @param filename: nombre de archivo con ruta absoluta 562        @return: parents array with parent nodes563        """564        parents = list()565        with codecs.open(filename, "r", "utf-8") as inp:566            xml_doc = minidom.parseString(inp.read().encode("utf-8"))567            for device in xml_doc.getElementsByTagName("Device"):568                if device.firstChild.nodeValue == "disk":569                    parent = device.parentNode570                    parents.append(parent)571        return parents572    def verify_alias_disk(self, running_ovf, export_ovf):573        """574        Verify if absence of Alias in ovf575        @param running_ovf: Absolute path of ovf file running virtual machine 576        @param export_ovf: Absolute path of ovf file export virtual machine577        @return: False if absence Alias or True if Alias exists578        """579        for filename in running_ovf, export_ovf:580            parents = self.get_item_tag(filename=filename)581            for parent in parents:582                if not parent.getElementsByTagName("Alias")[0].hasChildNodes():583                    print(filename)584                    return False585        return True586    def order_disks(self, running_ovf, export_ovf):587        """588        Parse xml files and find tag <Alias> and sort in two dictionaries589        @param running_ovf: Absolute path for running.ovf590        @param export_ovf: Absolute path for export.ovf591        @return: Two dictionaries with disks sorted592        """593        running_data = dict()594        export_data = dict()595        for filename in running_ovf, export_ovf:596            parents = self.get_item_tag(filename=filename)597            for parent in parents:598                data = list()599                alias = parent.getElementsByTagName("Alias")[0].firstChild.data600                directory, image = parent.getElementsByTagName("rasd:HostResource")[0].firstChild.data.split("/")601                data.extend((directory, image))602                #            print("Alias: {}".format(alias))603                #            print("Directory: {} Image: {}".format(directory, image))604                if filename == running_ovf:605                    running_data[alias] = data[:]606                    del data[:]607                if filename == export_ovf:608                    export_data[alias] = data[:]609                    del data[:]610        return running_data, export_data611        #            print(parent.getElementsByTagName("rasd:Caption")[0].toxml())612        #            print(parent.getElementsByTagName("rasd:InstanceId")[0].toprettyxml())613        #            print(parent.getElementsByTagName("rasd:HostResource")[0].firstChild.data)614    def move_images(self, running_ovf, export_ovf, path_images):615        """616        Move disks from export directories to New VM Directory617        @param running_ovf: Absolute path for running.ovf618        @param export_ovf: Absolute path for export.ovf619        @param path_images: Absolute path for images New VM Directory620        @return: None621        """622#        if self.verify_alias_disk(running_ovf=running_ovf, export_ovf=export_ovf):623        run_data, export_data = self.order_disks(running_ovf=running_ovf, export_ovf=export_ovf)624        for key in export_data:625            if key in run_data:626                # Export Section627                path_export = os.path.join(os.path.dirname(export_ovf), path_images)628                abs_export_path = path_export + "/" + export_data[key][0]629                abs_export_image_path = os.path.join(abs_export_path, export_data[key][1])630                abs_export_meta_path = abs_export_image_path + ".meta"631                # Run Section632                path_run = os.path.join(os.path.dirname(running_ovf), path_images)633                abs_run_path = path_run + "/" + run_data[key][0]634                abs_run_image_path = os.path.join(abs_run_path, run_data[key][1])635                abs_run_meta_path = abs_run_image_path + ".meta"636                # Operations637                # print("{} {} --> {} {}".format(key, abs_export_image_path, key, abs_run_image_path))638                # os.mkdir(abs_export_path)639                os.mkdir(abs_run_path)640                shutil.move(abs_export_image_path, abs_run_image_path)641                shutil.move(abs_export_meta_path, abs_run_meta_path)642                shutil.rmtree(abs_export_path)643        shutil.rmtree(os.path.dirname(export_ovf))644#        else:645#            print("stop")646#            exit(30)647class Spinner:648    """649    Class for implement Spinner in other process650    """651    def __init__(self):652        self.spinner = itertools.cycle(['-', '/', '|', '\\'])653    def update(self):654        """655        Update the icon for spinner656        @return: None657        """658        sys.stdout.write(self.spinner.next())659        sys.stdout.flush()660        sys.stdout.write('\b')661        time.sleep(0.3)662    def clear(self):663        """664        Flush stdout for spinner665        @return: None666        """667        sys.stdout.write("\033[?25h")668        sys.stdout.flush()669if __name__ == '__main__':670    print("This file is intended to be used as a library of functions and it's not expected to be executed directly")...test_volumes_backup.py
Source:test_volumes_backup.py  
...77        self.assertEqual(backup_name, backup['name'])78        self.backups_adm_client.wait_for_backup_status(backup['id'],79                                                       'available')80        # Export Backup81        export_backup = (self.backups_adm_client.export_backup(backup['id'])82                         ['backup-record'])83        self.assertIn('backup_service', export_backup)84        self.assertIn('backup_url', export_backup)85        self.assertTrue(export_backup['backup_service'].startswith(86                        'cinder.backup.drivers'))87        self.assertIsNotNone(export_backup['backup_url'])88        # Import Backup89        import_backup = self.backups_adm_client.import_backup(90            backup_service=export_backup['backup_service'],91            backup_url=export_backup['backup_url'])['backup']92        self.addCleanup(self._delete_backup, import_backup['id'])93        self.assertIn("id", import_backup)94        self.backups_adm_client.wait_for_backup_status(import_backup['id'],95                                                       'available')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
