How to use export_backup method in tempest

Best Python code snippet using tempest_python

ovirtbackup.py

Source:ovirtbackup.py Github

copy

Full Screen

1from __future__ import print_function2import fnmatch3import itertools4import os5import re6import shutil7import sys8import time9import codecs10from xml.dom import minidom11from ovirtsdk.api import API12from ovirtsdk.infrastructure.errors import RequestError13from ovirtsdk.xml import params14class OvirtBackup:15 """Class for export and import Virtual Machine in oVirt/RHEV environment"""16 def __init__(self, url, user, password):17 self.url = url18 self.user = user19 self.password = password20 def print_info(self):21 print(self.url)22 print(self.user)23 print(self.password)24 def connect(self):25 """Connect to oVirt/RHEV API"""26 try:27 self.api = API(url=self.url, username=self.user,28 password=self.password, insecure='True')29 return self.api30 except RequestError as err:31 print("Error: {} Reason: {}".format(err.status, err.reason))32 exit(1)33 def vm_state(self, vm):34 return self.api.vms.get(vm).get_status().get_state()35 def create_snap(self, desc, vm):36 """Create a snapshot from a virtual machine with params:37 @param desc: Description of Snapshot38 @param vm: Virtual Machine Name39 """40 try:41 snapshot = self.snapshot = self.api.vms.get(vm).snapshots.list()42 for snap in snapshot:43 if snap.description == desc:44 self.delete_snap(desc=desc, vm=vm)45 self.api.vms.get(vm).snapshots.add(46 params.Snapshot(47 description=desc,48 vm=self.api.vms.get(vm),49 persist_memorystate=False50 )51 )52 self.snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]53 self.__wait_snap(vm, self.snapshot.id)54 except RequestError as err:55 print("CREATE SNAP Error: {} Reason: {}".format(err.status, err.reason))56 raise Exception(12)57 def snapshot_status(self, vm, snap_id):58 snapshot = self.api.vms.get(vm).snapshots.get(id=snap_id)59 if snapshot:60 return True61 else:62 return False63 def __wait_snap(self, vm, id_snap):64 """ Time wait while create a snapshot of a Virtual Machine"""65 spinner = Spinner()66 print("waiting for snapshot to finish... ")67 while self.api.vms.get(vm).snapshots.get(id=id_snap).snapshot_status != "ok":68 spinner.update()69 spinner.clear()70 def __wait(self, vm, action):71 """Time wait while create and export of a Virtual Machine"""72 if action == 0:73 self.action = "creation"74 elif action == 1:75 self.action = "export"76 spinner = Spinner()77 print("waiting for vm {}... ".format(self.action))78 while self.get_vm_status(vm) != 'down':79 spinner.update()80 spinner.clear()81 def delete_snap(self, desc, vm):82 """Delete a snapshot from a virtual machine with params:83 @param desc: Description of Snapshot84 @param vm: Virtual Machine Name85 """86 try:87 snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]88 snapshot.delete()89 spinner = Spinner()90 print("waiting for delete old snapshot... ")91 while self.snapshot_status(vm, snap_id=snapshot.id):92 spinner.update()93 spinner.clear()94 except RequestError as err:95 print("DELETE SNAP Error: {} Reason: {}".format(err.status, err.reason))96 raise Exception(13)97 #exit(13)98 def create_vm_to_export(self, vm, new_name, desc):99 try:100 self.snapshot = self.api.vms.get(vm).snapshots.list(description=desc)[0]101 self.snapshots = params.Snapshots(snapshot=[params.Snapshot(id=self.snapshot.id)])102 self.cluster = self.api.clusters.get(id=self.api.vms.get(vm).cluster.id)103 self.api.vms.add(104 params.VM(105 name=new_name, snapshots=self.snapshots,106 cluster=self.cluster,107 template=self.api.templates.get(name="Blank"),108 delete_protected=False109 )110 )111 self.__wait(new_name, 0)112 except RequestError as err:113 print("Error: {} Reason: {}".format(err.status, err.reason))114 raise Exception(14)115 #exit(14)116 def get_storage_domains(self, vm):117 self.datacenter = self.get_dc(vm)118 return self.datacenter.storagedomains.list()119 def get_dc(self, vm):120 """Return Datacenter object121 :param vm: Virtual Machine Name122 """123 self.dc = self.api.datacenters.get(id=self.get_cluster(vm).data_center.id)124 return self.dc125 def get_cluster(self, vm):126 """Return Cluster object127 :param vm: Virtual Machine Name128 """129 self.cluster = self.api.clusters.get(id=self.api.vms.get(vm).cluster.id)130 return self.cluster131 def if_exists_vm(self, vm):132 """Verify if virtual machine and new virtual machine already exists"""133 if self.api.vms.get(vm):134 return 1135 else:136 return 0137 def get_vm_status(self, vm):138 """Verify status of virtual machine"""139 self.state = self.api.vms.get(vm).status.state140 return self.state141 def delete_tmp_vm(self, name):142 try:143 self.api.vms.get(name=name).delete()144 return 1145 except Exception as e:146 print(e.message)147 raise Exception(18)148 def export_vm(self, new_name, export, collapse):149 try:150 if collapse == 'False':151 self.api.vms.get(name=new_name).export(152 params.Action(153 storage_domain=export,154 force=True155 )156 )157 self.__wait(new_name, 1)158 elif collapse == 'True':159 self.api.vms.get(name=new_name).export(160 params.Action(161 storage_domain=export,162 force=True,163 discard_snapshots=True164 )165 )166 self.__wait(new_name, 1)167 except Exception as e:168 print(e.message)169 raise Exception(14)170 #exit(14)171 def clean_export_domain(self, name, export):172 snap_name = name + "-SNAP"173 export_vms = list()174 for vm in self.api.storagedomains.get(export).vms.list():175 if (vm.get_name() == name) or (vm.get_name() == snap_name):176 export_vms.append(vm.get_name())177 try:178 for list_vm in export_vms:179 print("Delete exported virtual machine {} from {} ".format(list_vm, export))180 self.api.storagedomains.get(export).vms.get(list_vm).delete()181 return 1182 except RequestError as err:183 print("Error: {} Reason: {}".format(err.status, err.reason))184 return 0185 # Funciones de manejo de Exports Domains186 def get_export_domain(self, vm):187 """Return Export Domain188 :param vm: Virtual Machine Name189 """190 self.cluster = self.get_cluster(vm)191 self.dc = self.get_dc(vm)192 self.export = None193 for self.sd in self.dc.storagedomains.list():194 if self.sd.get_type() == "export":195 self.export = self.sd196 return self.export197 def verify_valid_export(self, dc_id, export, current):198 if current == export:199 if self.api.datacenters.get(id=dc_id).storagedomains.get(200 export).get_status().get_state() == "active":201 return 1202 else:203 return 2204 elif current != export:205 return 0206 def detach_export(self, dc_id, export):207 if self.api.datacenters.get(id=dc_id).storagedomains.get(export).delete():208 print("Detach OK")209 def do_export_maintenance(self, dc_id, export):210 self.api.datacenters.get(id=dc_id).storagedomains.get(export).deactivate()211 spinner = Spinner()212 print("waiting for maintenance Storage {}... ".format(export))213 while self.api.datacenters.get(id=dc_id).storagedomains.get(214 export).get_status().get_state() != "maintenance":215 spinner.update()216 spinner.clear()217 def attach_export(self, dc_id, export):218 try:219 if self.api.datacenters.get(id=dc_id).storagedomains.add(self.api.storagedomains.get(export)):220 print("Export Domain was attached successfully")221 except RequestError as err:222 print("Error: {} Reason: {}".format(err.status, err.reason))223 raise Exception(17)224 #exit(17)225 def do_export_up(self, dc_id, export):226 if self.api.datacenters.get(id=dc_id).storagedomains.get(export).activate():227 print('Export Domain was activate successfully')228 def prepare_export(self, dc_id, name_export):229 print("Different Export Attached")230 self.do_export_maintenance(dc_id, name_export)231 self.detach_export(dc_id, name_export)232 def active_export(self, vm, export_name):233 self.export_attached = self.get_export_domain(vm)234 dc = self.get_dc(vm)235 if self.export_attached is not None:236 status_export = self.verify_valid_export(dc.id, export_name, self.export_attached.name)237 if status_export == 1:238 print("Export {} is OK".format(export_name))239 elif status_export == 0:240 self.prepare_export(dc.id, self.export_attached.name)241 self.attach_export(dc.id, export_name)242 elif status_export == 2:243 self.do_export_up(dc.id, export_name)244 elif self.export_attached is None:245 self.attach_export(dc.id, export_name)246# Seccion de funciones para movimiento de archivos247 def create_dirs(self, vm_name, export_path, images, vms):248 try:249 images = images.strip("/")250 vms = vms.strip("/")251 self.new_images_path = os.path.join(export_path, vm_name, images)252 self.new_vms_path = os.path.join(export_path, vm_name, vms)253 os.makedirs(self.new_vms_path)254 os.makedirs(self.new_images_path)255 except OSError as e:256 print(e)257 raise Exception(15)258 #exit(15)259 def mv_data(self, new_name, export, source, destination, stid):260 self.dest = export + new_name + destination261 os.chdir(export + stid + destination) # cambiando a /exportdomain/UUID/{master/vms, images}262 shutil.move(source, self.dest)263 def do_mv(self, vm, export_path, images, vms):264 obj_vm = self.api.vms.get(vm)265 # disks = self.api.vms.get(new_vm).disks.list()266 storage_id = self.get_export_domain(vm)267 disks = obj_vm.disks.list()268 objects = {"Disks": list(), "Vms": list()}269 objects["Vms"].append(obj_vm.id)270 for disk in disks:271 objects["Disks"].append(disk.id)272 old_name = ''273 pattern = '-SNAP'274 if re.search(pattern, vm):275 old_name = vm.split("-SNAP")[0]276 else:277 old_name = vm278 for disk in objects["Disks"]:279 self.mv_data(old_name, export_path, disk, images, storage_id.id)280 for vm_iter in objects["Vms"]:281 self.mv_data(old_name, export_path, vm_iter, vms, storage_id.id)282 # Seccion funciones modificacion del xml283 def get_running_ovf(self, vm, desc, path):284 """Get ovf info from snapshot of original VM"""285 try:286 self.snapshot = self.api.vms.get(vm).snapshots.list(287 all_content=True, description=desc)[0]288 print("Get running ovf definition")289 self.ovf = self.snapshot.get_initialization().get_configuration().get_data()290 complete_path = path + vm291 ovf_path = os.path.join(complete_path, "running-" + self.api.vms.get(vm).id + '.ovf')292 print("Write running ovf file")293 ovfFile = codecs.open(ovf_path, encoding='utf-8', mode='w')294 ovfFile.write(self.ovf)295 ovfFile.close()296 print("Write running ovf file: [ OK ]")297 return ovf_path298 except RequestError as err:299 print("Error: {} Reason: {}".format(err.status, err.reason))300 raise Exception(16)301 def get_vm_export_xml(self, xml_export):302 xml_tag = xml_export.getElementsByTagName("rasd:StorageId")303 storage_ids = list()304 for storage_id in xml_tag:305 storage_ids.append(storage_id.firstChild.nodeValue)306 return storage_ids307 def add_storage_id_xml(self, xml_original, xml_export):308 print("Reading xml's definitions")309 with codecs.open(xml_original, "r", "utf-8") as inp:310 xml_doc = minidom.parseString(inp.read().encode("utf-8"))311 with codecs.open(xml_export, "r", "utf-8") as inp2:312 xml_export_obj = minidom.parseString(inp2.read().encode("utf-8"))313 print("Reading xml's definitions: [ OK ]")314 count = 0315 for item in xml_doc.getElementsByTagName("Device"):316 if item.firstChild.nodeValue == "disk":317 st_id = self.get_vm_export_xml(xml_export_obj)318 StorageId = xml_doc.createElement("rasd:StorageId")319 content = xml_doc.createTextNode(st_id[count])320 StorageId.appendChild(content)321 parent = item.parentNode322 parent.appendChild(StorageId)323 count += 1324 return xml_doc325 def save_new_ovf(self, path, name, xml):326 try:327 directory = os.path.splitext(name)[0]328 os.mkdir(path + directory)329 save_name = path + directory + "/" + name330 print("Saving changes..")331 with codecs.open(save_name, 'a', 'utf-8') as new_ovf_file:332 new_ovf_file.write(xml.toxml())333 print("Saving changes: [ OK ]")334 except OSError as e:335 print(e)336 def delete_tmp_ovf(self, path):337 try:338 os.remove(path)339 except OSError as e:340 print(e)341 def export_xml_path(self, path, vm, find_path=None):342 if find_path is not None:343 complete_path = path + vm + find_path344 else:345 complete_path = path + vm346 for root, dirs, filename in os.walk(complete_path):347 for name in filename:348 if fnmatch.fnmatch(name, "*.ovf"):349 return os.path.join(root, name)350 def change_owner(self, path):351 uid = 36352 gid = 36353 for root, dirs, files in os.walk(path):354 for one_dir in dirs:355 os.chown(os.path.join(root, one_dir), uid, gid)356 for one_file in files:357 os.chown(os.path.join(root, one_file), uid, gid)358 def change_dirname(self, path, vm, timestamp):359 try:360 new_dir = os.path.join(path, vm + "-" + timestamp)361 old_dir = os.path.join(path, vm)362 print("change from {} to {}".format(old_dir, new_dir))363 shutil.move(old_dir, new_dir)364 except OSError as e:365 print(e.errno)366 return e.errno367 def log_event(self, vm, msg, severity):368 # Funcion para manejo de LOG369 try:370 vm_obj = self.api.vms.get(vm)371 self.api.events.add(372 params.Event(373 vm=vm_obj,374 origin='vm-backup',375 description=msg,376 severity=severity,377 custom_id=int(time.time())378 )379 )380 except:381 pass382 def clean_dir(self, path, vm):383 pattern = vm + '-\d*|' + vm384 try:385 for f in os.listdir(path):386 if re.search(pattern, f):387 folder = os.path.join(path, f)388 if os.path.isdir(folder):389 shutil.rmtree(folder)390 self.log_event(vm=vm, msg="delete old backup " + folder, severity='info')391 return 1392 except OSError as err:393 self.log_event(vm=vm, msg=err, severity='error')394 return 0395 def verify_path(self, path):396 try:397 if os.path.isdir(path):398 return 1399 else:400 return 0401 except OSError as err:402 return 0403 def verify_environment(self, path, vm, export):404 if self.verify_path(path=path):405 print("Verify exists path {}: [ OK ]".format(path))406 #path_backup = os.path.join(path, vm)407 storage_list = list()408 for storage in self.api.storagedomains.list():409 storage_list.append(storage.name)410 if export in storage_list:411 print("Verify exists export domain {}: [ OK ]".format(export))412 if self.clean_dir(path=path, vm=vm):413 print("Delete old backup directory for {} if exist's: [ OK ]".format(vm))414 return 1415 else:416 print("Delete old backup for {}: [ FAIL ]".format(vm))417 return 0418 else:419 print("Verify exists export domain {}: [ FAIL ]".format(export))420 return 0421 else:422 print("Verify exists path {}: [ FAIL ]".format(path))423 return 0424# EXPORT'S LOGIC425 def have_export(self, name):426 vm_cluster = self.api.clusters.get(id=self.api.vms.get(name=name).cluster.id)427 vm_datacenter = self.api.datacenters.get(id=vm_cluster.data_center.id)428 for storage in vm_datacenter.storagedomains.list():429 if storage.get_type() == 'export':430 return storage, vm_datacenter431 return None, vm_datacenter432 def status_export(self, export_obj):433 return export_obj.get_status().get_state()434 def find_export(self, export_name):435 attached_dc = None436 for data_center in self.api.datacenters.list():437 for storage in data_center.storagedomains.list():438 if storage.get_name() == export_name:439 attached_dc = data_center440 return attached_dc, storage441 if attached_dc is None:442 for storage in self.api.storagedomains.list():443 if storage.get_type() == 'export':444 if storage.get_name() == export_name:445 return attached_dc, storage446 447 def manage_export(self, name, export):448 vm_export, vm_datacenter = self.have_export(name)449 if vm_export is not None:450 print('VM {} have Export "{}"'.format(name, vm_export.get_name()))451 if self.status_export(export_obj=vm_export) == 'active':452 print('State of domain "{}" active'.format(vm_export.get_name()))453 if vm_export.get_name() == export:454 print('Everything with Exports [ OK ]')455 if vm_export.get_name() != export:456 print('Domain "{}" is not BK Domain'.format(vm_export.get_name()))457 print('Export "{}" do maintenance'.format(vm_export.get_name()))458 self.do_export_maintenance(dc_id=vm_datacenter.id, export=vm_export.get_name())459 print('Trying to Detach Export "{}" from {}'.format(vm_export.get_name(), vm_datacenter.get_name()))460 self.detach_export(dc_id=vm_datacenter.id, export=vm_export.get_name())461 print('Trying to Locate BK Domain "{}"'.format(export))462 # Buscando export domain Backup463 data_center, export_backup = self.find_export(export_name=export)464 if data_center is None:465 print('Export "{}" is not attached to any datacenter'.format(export))466 print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))467 self.attach_export(dc_id=vm_datacenter.id, export=export)468 elif data_center is not None:469 if self.status_export(export_obj=export_backup) == 'active':470 print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),471 data_center.get_name()))472 print('Export "{}" do maintenance'.format(export_backup.get_name()))473 self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())474 print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),475 data_center.get_name()))476 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())477 print(478 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),479 vm_datacenter.get_name()))480 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())481 if self.status_export(export_obj=export_backup) == 'maintenance':482 print('State of domain "{}" maintenance'.format(export_backup.get_name()))483 print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),484 data_center.get_name()))485 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())486 print(487 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),488 vm_datacenter.get_name()))489 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())490 elif self.status_export(export_obj=vm_export) == 'maintenance':491 print('State of domain "{}" maintenance'.format(vm_export.get_name()))492 if vm_export.get_name() == export:493 print('Trying to activate Export "{}"'.format(export))494 self.do_export_up(dc_id=vm_datacenter.id, export=vm_export.get_name())495 if vm_export.get_name() != export:496 print('Domain "{}" is not BK Domain'.format(vm_export.get_name()))497 print('Trying to Detach Export "{}" from {}'.format(vm_export.get_name(), vm_datacenter.get_name()))498 self.detach_export(dc_id=vm_datacenter.id, export=vm_export.get_name())499 print('Trying to Locate BK Domain "{}"'.format(export))500 # Buscando export domain Backup501 data_center, export_backup = self.find_export(export_name=export)502 if data_center is None:503 print('Export "{}" is not attached to any datacenter'.format(export))504 print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))505 self.attach_export(dc_id=vm_datacenter.id, export=export)506 elif data_center is not None:507 if self.status_export(export_obj=export_backup) == 'active':508 print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),509 data_center.get_name()))510 print('Export "{}" do maintenance'.format(export_backup.get_name()))511 self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())512 print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),513 data_center.get_name()))514 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())515 print(516 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),517 vm_datacenter.get_name()))518 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())519 if self.status_export(export_obj=export_backup) == 'maintenance':520 print('State of domain "{}" maintenance'.format(export_backup.get_name()))521 print('Trying to Detach Export "{}" from {}'.format(export_backup.get_name(),522 data_center.get_name()))523 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())524 print(525 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(),526 vm_datacenter.get_name()))527 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())528 elif vm_export is None:529 # Buscando export domain Backup530 data_center, export_backup = self.find_export(export_name=export)531 if data_center is None:532 print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))533 print('Export "{}" is not attached to any datacenter'.format(export))534 print('Trying to Attach Domain "{}" to {}'.format(export, vm_datacenter.get_name()))535 self.attach_export(dc_id=vm_datacenter.id, export=export)536 elif data_center is not None:537 if self.status_export(export_obj=export_backup) == 'active':538 print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))539 print('Export "{}" is attached to datacenter {}'.format(export_backup.get_name(),540 data_center.get_name()))541 print('Export "{}" do maintenance'.format(export_backup.get_name()))542 self.do_export_maintenance(dc_id=data_center.id, export=export_backup.get_name())543 print(544 'Trying to Detach Export "{}" from {}'.format(export_backup.get_name(), data_center.get_name()))545 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())546 print(547 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(), vm_datacenter.get_name()))548 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())549 if self.status_export(export_obj=export_backup) == 'maintenance':550 print('VM {} in datacenter "{}" without Export'.format(name, vm_datacenter.get_name()))551 print(552 'Trying to Detach Export "{}" from {}'.format(export_backup.get_name(), data_center.get_name()))553 self.detach_export(dc_id=data_center.id, export=export_backup.get_name())554 print(555 'Trying to Attach Domain "{}" to {}'.format(export_backup.get_name(), vm_datacenter.get_name()))556 self.attach_export(dc_id=vm_datacenter.id, export=export_backup.get_name())557# disks Logic558 def get_item_tag(self, filename):559 """560 Recibe el nombre de archivo con ruta absoluta561 @param filename: nombre de archivo con ruta absoluta 562 @return: parents array with parent nodes563 """564 parents = list()565 with codecs.open(filename, "r", "utf-8") as inp:566 xml_doc = minidom.parseString(inp.read().encode("utf-8"))567 for device in xml_doc.getElementsByTagName("Device"):568 if device.firstChild.nodeValue == "disk":569 parent = device.parentNode570 parents.append(parent)571 return parents572 def verify_alias_disk(self, running_ovf, export_ovf):573 """574 Verify if absence of Alias in ovf575 @param running_ovf: Absolute path of ovf file running virtual machine 576 @param export_ovf: Absolute path of ovf file export virtual machine577 @return: False if absence Alias or True if Alias exists578 """579 for filename in running_ovf, export_ovf:580 parents = self.get_item_tag(filename=filename)581 for parent in parents:582 if not parent.getElementsByTagName("Alias")[0].hasChildNodes():583 print(filename)584 return False585 return True586 def order_disks(self, running_ovf, export_ovf):587 """588 Parse xml files and find tag <Alias> and sort in two dictionaries589 @param running_ovf: Absolute path for running.ovf590 @param export_ovf: Absolute path for export.ovf591 @return: Two dictionaries with disks sorted592 """593 running_data = dict()594 export_data = dict()595 for filename in running_ovf, export_ovf:596 parents = self.get_item_tag(filename=filename)597 for parent in parents:598 data = list()599 alias = parent.getElementsByTagName("Alias")[0].firstChild.data600 directory, image = parent.getElementsByTagName("rasd:HostResource")[0].firstChild.data.split("/")601 data.extend((directory, image))602 # print("Alias: {}".format(alias))603 # print("Directory: {} Image: {}".format(directory, image))604 if filename == running_ovf:605 running_data[alias] = data[:]606 del data[:]607 if filename == export_ovf:608 export_data[alias] = data[:]609 del data[:]610 return running_data, export_data611 # print(parent.getElementsByTagName("rasd:Caption")[0].toxml())612 # print(parent.getElementsByTagName("rasd:InstanceId")[0].toprettyxml())613 # print(parent.getElementsByTagName("rasd:HostResource")[0].firstChild.data)614 def move_images(self, running_ovf, export_ovf, path_images):615 """616 Move disks from export directories to New VM Directory617 @param running_ovf: Absolute path for running.ovf618 @param export_ovf: Absolute path for export.ovf619 @param path_images: Absolute path for images New VM Directory620 @return: None621 """622# if self.verify_alias_disk(running_ovf=running_ovf, export_ovf=export_ovf):623 run_data, export_data = self.order_disks(running_ovf=running_ovf, export_ovf=export_ovf)624 for key in export_data:625 if key in run_data:626 # Export Section627 path_export = os.path.join(os.path.dirname(export_ovf), path_images)628 abs_export_path = path_export + "/" + export_data[key][0]629 abs_export_image_path = os.path.join(abs_export_path, export_data[key][1])630 abs_export_meta_path = abs_export_image_path + ".meta"631 # Run Section632 path_run = os.path.join(os.path.dirname(running_ovf), path_images)633 abs_run_path = path_run + "/" + run_data[key][0]634 abs_run_image_path = os.path.join(abs_run_path, run_data[key][1])635 abs_run_meta_path = abs_run_image_path + ".meta"636 # Operations637 # print("{} {} --> {} {}".format(key, abs_export_image_path, key, abs_run_image_path))638 # os.mkdir(abs_export_path)639 os.mkdir(abs_run_path)640 shutil.move(abs_export_image_path, abs_run_image_path)641 shutil.move(abs_export_meta_path, abs_run_meta_path)642 shutil.rmtree(abs_export_path)643 shutil.rmtree(os.path.dirname(export_ovf))644# else:645# print("stop")646# exit(30)647class Spinner:648 """649 Class for implement Spinner in other process650 """651 def __init__(self):652 self.spinner = itertools.cycle(['-', '/', '|', '\\'])653 def update(self):654 """655 Update the icon for spinner656 @return: None657 """658 sys.stdout.write(self.spinner.next())659 sys.stdout.flush()660 sys.stdout.write('\b')661 time.sleep(0.3)662 def clear(self):663 """664 Flush stdout for spinner665 @return: None666 """667 sys.stdout.write("\033[?25h")668 sys.stdout.flush()669if __name__ == '__main__':670 print("This file is intended to be used as a library of functions and it's not expected to be executed directly")...

Full Screen

Full Screen

test_volumes_backup.py

Source:test_volumes_backup.py Github

copy

Full Screen

...77 self.assertEqual(backup_name, backup['name'])78 self.backups_adm_client.wait_for_backup_status(backup['id'],79 'available')80 # Export Backup81 export_backup = (self.backups_adm_client.export_backup(backup['id'])82 ['backup-record'])83 self.assertIn('backup_service', export_backup)84 self.assertIn('backup_url', export_backup)85 self.assertTrue(export_backup['backup_service'].startswith(86 'cinder.backup.drivers'))87 self.assertIsNotNone(export_backup['backup_url'])88 # Import Backup89 import_backup = self.backups_adm_client.import_backup(90 backup_service=export_backup['backup_service'],91 backup_url=export_backup['backup_url'])['backup']92 self.addCleanup(self._delete_backup, import_backup['id'])93 self.assertIn("id", import_backup)94 self.backups_adm_client.wait_for_backup_status(import_backup['id'],95 'available')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful