How to use lv_check method in avocado

Best Python code snippet using avocado_python Github


Full Screen

...86 error.context("Creating thin pool for thinly provisioned volumes",87 if not vg_check(vg_name):89 raise error.TestError("Volume group could not be found")90 if lv_check(vg_name, pool_name):91 raise error.TestError("Thin pool already exists")92 cmd = "lvcreate -L %s --thin %s/%s" % (pool_size, vg_name, pool_name)93 result = vg_ramdisk_cleanup(ramdisk_filename=None, vg_ramdisk_dir=None,96 vg_name=None, loop_device=None, use_tmpfs=True):97 """98 Inline cleanup function in case of test error.99 """100 if vg_name is not None:101 loop_device ="([/\w]+) %s lvm2" % vg_name,102"pvs").stdout)103 if loop_device is not None:104 loop_device = result ="vgremove -f %s" % vg_name, ignore_status=True)106 if result.exit_status == 0:107 else:109 logging.debug("%s -> %s", result.command, result.stderr)110 if loop_device is not None:111 result ="pvremove -f %s" % loop_device, ignore_status=True)112 if result.exit_status == 0:113 else:115 logging.debug("%s -> %s", result.command, result.stderr)116 if loop_device in"losetup --all").stdout:117 ramdisk_filename ="%s: \[\d+\]:\d+ \(([/\w]+)\)" % loop_device,118"losetup --all").stdout)119 if ramdisk_filename is not None:120 ramdisk_filename = for _ in range(10):122 time.sleep(0.1)123 result ="losetup -d %s" % loop_device, ignore_status=True)124 if "resource busy" not in result.stderr:125 if result.exit_status == 0:126"Loop device %s deleted", loop_device)127 else:128 logging.debug("%s -> %s", result.command, result.stderr)129 break130 if ramdisk_filename is not None:131 if os.path.exists(ramdisk_filename):132 os.unlink(ramdisk_filename)133"Ramdisk filename %s deleted", ramdisk_filename)134 vg_ramdisk_dir = os.path.dirname(ramdisk_filename)135 if vg_ramdisk_dir is not None:136 if use_tmpfs:137"umount %s" % vg_ramdisk_dir, ignore_status=True)138 if result.exit_status == 0:139"Successfully unmounted tmpfs from %s", vg_ramdisk_dir)140 else:141 logging.debug("%s -> %s", result.command, result.stderr)142 if os.path.exists(vg_ramdisk_dir):143 try:144 shutil.rmtree(vg_ramdisk_dir)145"Ramdisk directory %s deleted", vg_ramdisk_dir)146 except OSError:147 pass148def vg_check(vg_name):149 """150 Check whether provided volume group exists.151 """152 cmd = "vgdisplay %s" % vg_name153 try:154 logging.debug("Provided volume group exists: %s", vg_name)156 return True157 except error.CmdError:158 return False159def vg_list():160 """161 List available volume groups.162 """163 cmd = "vgs --all"164 vgroups = {}165 result = lines = result.stdout.strip().splitlines()167 if len(lines) > 1:168 columns = lines[0].split()169 lines = lines[1:]170 else:171 return vgroups172 for line in lines:173 details = line.split()174 details_dict = {}175 index = 0176 for column in columns:177 if"VG", column):178 vg_name = details[index]179 else:180 details_dict[column] = details[index]181 index += 1182 vgroups[vg_name] = details_dict183 return vgroups184@error.context_aware185def vg_create(vg_name, pv_list, force=False):186 """187 Create a volume group by using the block special devices188 """189 error.context(190 "Creating volume group '%s' by using '%s'" %191 (vg_name, pv_list), if vg_check(vg_name):193 raise error.TestError("Volume group '%s' already exist" % vg_name)194 if force:195 cmd = "vgcreate -f"196 else:197 cmd = "vgcreate"198 cmd += " %s %s" % (vg_name, pv_list)199 result = vg_remove(vg_name):203 """204 Remove a volume group.205 """206 error.context("Removing volume '%s'" % vg_name, if not vg_check(vg_name):208 raise error.TestError("Volume group '%s' could not be found" % vg_name)209 cmd = "vgremove -f %s" % vg_name210 result = lv_list(vg_name):213 cmd = "lvs %s" % vg_name214 result = logging.debug("Logical volumes in %s:\n%s", vg_name, result)216 lvpattern = r"(\w+)\s+%s" % vg_name217 return re.findall(lvpattern, result.stdout.rstrip())218def lv_check(vg_name, lv_name):219 """220 Check whether provided logical volume exists.221 """222 cmd = "lvdisplay %s" % vg_name223 result =, ignore_status=True)224 # unstable approach but currently works225 lvpattern = r"LV Name\s+%s\s+" % lv_name226 match =, result.stdout.rstrip())227 if match:228 logging.debug("Provided logical volume %s exists in %s", lv_name, vg_name)229 return True230 else:231 return False232@error.context_aware233def lv_remove(vg_name, lv_name):234 """235 Remove a logical volume.236 """237 error.context("Removing volume /dev/%s/%s" %238 (vg_name, lv_name), logging.debug)239 if not vg_check(vg_name):240 raise error.TestError("Volume group could not be found")241 if not lv_check(vg_name, lv_name):242 raise error.TestError("Logical volume could not be found")243 cmd = "lvremove -f %s/%s" % (vg_name, lv_name)244 result = lv_create(vg_name, lv_name, lv_size):248 """249 Create a logical volume in a volume group.250 The volume group must already exist.251 """252 error.context("Creating original lv to take a snapshot from",253 logging.debug)254 if not vg_check(vg_name):255 raise error.TestError("Volume group could not be found")256 if lv_check(vg_name, lv_name):257 raise error.TestError("Logical volume already exists")258 cmd = "lvcreate --size %s --name %s %s" % (lv_size, lv_name, vg_name)259 result = lv_list_all():262 """263 List available group volumes.264 """265 cmd = "lvs --all"266 volumes = {}267 result = lines = result.stdout.strip().splitlines()269 if len(lines) > 1:270 columns = lines[0].split()271 lines = lines[1:]272 else:273 return volumes274 for line in lines:275 details = line.split()276 length = len(details)277 details_dict = {}278 lv_name = details[0]279 details_dict["VG"] = details[1]280 details_dict["Attr"] = details[2]281 details_dict["LSize"] = details[3]282 if length == 5:283 details_dict["Origin_Data"] = details[4]284 elif length > 5:285 details_dict["Origin_Data"] = details[5]286 details_dict["Pool"] = details[4]287 volumes[lv_name] = details_dict288 return volumes289def thin_lv_create(vg_name, thinpool_name="lvthinpool", thinpool_size="1.5G",290 thinlv_name="lvthin", thinlv_size="1G"):291 """292 Create a thin volume from given volume group.293 :param vg_name: An exist volume group294 :param thinpool_name: The name of thin pool295 :param thinpool_size: The size of thin pool to be created296 :param thinlv_name: The name of thin volume297 :param thinlv_size: The size of thin volume298 """299 tp_cmd = "lvcreate --thinpool %s --size %s %s" % (thinpool_name,300 thinpool_size,301 vg_name)302 try:303 except error.CmdError, detail:305 logging.debug(detail)306 raise error.TestError("Create thin volume pool failed.")307 logging.debug("Created thin volume pool: %s", thinpool_name)308 lv_cmd = ("lvcreate --name %s --virtualsize %s "309 "--thin %s/%s" % (thinlv_name, thinlv_size,310 vg_name, thinpool_name))311 try:312 except error.CmdError, detail:314 logging.debug(detail)315 raise error.TestError("Create thin volume failed.")316 logging.debug("Created thin volume:%s", thinlv_name)317 return (thinpool_name, thinlv_name)318@error.context_aware319def lv_create_thin(vg_name, pool_name, lv_name, lv_size):320 """321 Create a thinly provisioned logical volume in a volume group.322 The volume group must already exist.323 """324 error.context("Creating original thin lv to take a snapshot from",325 logging.debug)326 if not vg_check(vg_name):327 raise error.TestError("The volume group could not be found")328 if not lv_check(vg_name, pool_name):329 raise error.TestError("The thin pool could not be found")330 if lv_check(vg_name, lv_name):331 raise error.TestError("The logical volume already exists")332 cmd = "lvcreate --virtualsize %s --thin %s/%s --name %s" % (lv_size, vg_name,333 pool_name, lv_name)334 result = lv_take_snapshot(vg_name, lv_name,338 lv_snapshot_name, lv_snapshot_size):339 """340 Take a snapshot of the original logical volume.341 """342 error.context("Taking a snapshot from original logical volume",343 logging.debug)344 if not vg_check(vg_name):345 raise error.TestError("Volume group could not be found")346 if lv_check(vg_name, lv_snapshot_name):347 raise error.TestError("Snapshot already exists")348 if not lv_check(vg_name, lv_name):349 raise error.TestError("Snapshot's origin could not be found")350 cmd = ("lvcreate -s --name %s /dev/%s/%s --size %s" %351 (lv_snapshot_name, vg_name, lv_name, lv_snapshot_size))352 try:353 result = except error.CmdError, ex:355 if ('Logical volume "%s" already exists in volume group "%s"' %356 (lv_snapshot_name, vg_name) in ex.result_obj.stderr and357 + " [active]"),358"lvdisplay").stdout)):359 # the above conditions detect if merge of snapshot was postponed360 logging.warning(("Logical volume %s is still active! " +361 "Attempting to deactivate..."), lv_name)362 lv_reactivate(vg_name, lv_name)363 result = else:365 raise ex366 lv_take_thin_snapshot(vg_name, lv_name, lv_snapshot_name):369 """370 Take a thinly provisioned snapshot of a thin logical volume.371 """372 error.context("Taking a thin snapshot from a thin logical volume",373 logging.debug)374 if not vg_check(vg_name):375 raise error.TestError("Volume group could not be found")376 if lv_check(vg_name, lv_snapshot_name):377 raise error.TestError("Snapshot already exists")378 if not lv_check(vg_name, lv_name):379 raise error.TestError("Snapshot's origin could not be found")380 cmd = ("lvcreate -s --name %s /dev/%s/%s --ignoreactivationskip" %381 (lv_snapshot_name, vg_name, lv_name))382 # lvcreate -s --thinpool vg001/pool origin_volume --name mythinsnap383 result = lv_take_thin_snapshot_from_external(vg_name, pool_name, lv_name, lv_snapshot_name):387 """388 Take a thinly provisioned snapshot of external logical volume.389 """390 error.context("Taking a thin snapshot from an external logical volume",391 logging.debug)392 if not vg_check(vg_name):393 raise error.TestError("Volume group could not be found")394 if not lv_check(vg_name, pool_name):395 raise error.TestError("Snapshot's thin pool could not be found")396 if lv_check(vg_name, lv_snapshot_name):397 raise error.TestError("Snapshot already exists")398 if not lv_check(vg_name, lv_name):399 raise error.TestError("Snapshot's origin could not be found")400 cmd = ("lvcreate -s --thinpool %s/%s %s --name %s --ignoreactivationskip" %401 (vg_name, pool_name, lv_name, lv_snapshot_name))402 result = lv_revert(vg_name, lv_name, lv_snapshot_name):406 """407 Revert the origin to a snapshot.408 """409 error.context("Reverting original logical volume to snapshot",410 logging.debug)411 try:412 if not vg_check(vg_name):413 raise error.TestError("Volume group could not be found")414 if not lv_check(vg_name, lv_snapshot_name):415 raise error.TestError("Snapshot could not be found")416 if (not lv_check(vg_name, lv_snapshot_name) and not lv_check(vg_name,417 lv_name)):418 raise error.TestError("Snapshot and its origin could not be found")419 if (lv_check(vg_name, lv_snapshot_name) and not lv_check(vg_name,420 lv_name)):421 raise error.TestError("Snapshot origin could not be found")422 cmd = ("lvconvert --merge --interval 1 /dev/%s/%s" % (vg_name, lv_snapshot_name))423 result = if (("Merging of snapshot %s will start next activation." %425 lv_snapshot_name) in result.stdout):426 raise error.TestError("The logical volume %s is still active" %427 lv_name)428 result = result.stdout.rstrip()429 except error.TestError, ex:430 # detect if merge of snapshot was postponed431 # and attempt to reactivate the volume.432 active_lv_pattern = re.escape("%s [active]" % lv_snapshot_name)433 lvdisplay_output ="lvdisplay").stdout...

Full Screen

Full Screen Github


Full Screen

...42 lv_utils.vg_ramdisk(vg_name, ramdisk_vg_size,43 ramdisk_basedir,44 ramdisk_sparse_filename)45 # if no snapshot is defined start fresh logical volume46 if override_flag == 1 and lv_utils.lv_check(vg_name, lv_name):47 lv_utils.lv_remove(vg_name, lv_name)48 lv_utils.lv_create(vg_name, lv_name, lv_size)49 elif override_flag == -1 and lv_utils.lv_check(vg_name, lv_name):50 lv_utils.lv_remove(vg_name, lv_name)51 else:52 # perform normal check policy53 if (lv_utils.lv_check(vg_name, lv_snapshot_name) and54 lv_utils.lv_check(vg_name, lv_name)):55 lv_utils.lv_revert(vg_name, lv_name, lv_snapshot_name)56 lv_utils.lv_take_snapshot(vg_name, lv_name,57 lv_snapshot_name,58 lv_snapshot_size)59 elif (lv_utils.lv_check(vg_name, lv_snapshot_name) and60 not lv_utils.lv_check(vg_name, lv_name)):61 raise error.TestError("Snapshot origin not found")62 elif (not lv_utils.lv_check(vg_name, lv_snapshot_name) and63 lv_utils.lv_check(vg_name, lv_name)):64 lv_utils.lv_take_snapshot(vg_name, lv_name,65 lv_snapshot_name,66 lv_snapshot_size)67 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?