How to use vg_list method in avocado

Best Python code snippet using avocado_python

lvm_operation.py

Source:lvm_operation.py Github

copy

Full Screen

1# -*- coding:utf-8 -*-2import os3import re4import yaml5import pprint6from execute.linstor_api import LinstorAPI7import utils8import sys9import sundry as s10def size_conversion(size_str):11 m = re.match(r'([0-9.]+)\s*(\D*)', size_str)12 try:13 size = float(m.group(1))14 unit_str = m.group(2)15 if unit_str in ["", "M", "MB", "MiB"]:16 pass17 elif unit_str in ["K", "KB", "KiB"]:18 size = size / 102419 elif unit_str in ["G", "GB", "GiB"]:20 size = size * 102421 elif unit_str in ["T", "TB", "TiB"]:22 size = size * 1024 * 102423 else:24 s.prt_log("Unit error of size!", 2)25 except AttributeError:26 s.prt_log("The size that you input is not a valid number", 2)27 return size28class ClusterLVM(object):29 def __init__(self, node):30 try:31 self.api = LinstorAPI()32 self.sp = self.api.get_storagepool([node])33 self.res = self.api.get_resource([node])34 except AttributeError:35 self.sp = None36 self.res = None37 if node == utils.get_hostname():38 self.conn = None39 else:40 self.conn = utils.SSHConn(node)41 self.pv_list = self.get_pvs()42 self.vg_list = self.get_vgs()43 self.lv_list = self.get_lvs()44 # def create_linstor_thinpool(self, name, node, list_pv):45 # pv = ' '.join(list_pv)46 # cmd = f"linstor ps cdp --pool-name {name} lvmthin {node} {pv}"47 # result = utils.exec_cmd(cmd, self.conn)48 # if result["st"]:49 # return True50 # else:51 # print(f"Failed to create Thinpool {name} via LINSTOR")52 # return False53 # def create_linstor_vg(self, name, node, list_pv):54 # pv = ' '.join(list_pv)55 # cmd = f"linstor ps cdp --pool-name {name} lvm {node} {pv}"56 # result = utils.exec_cmd(cmd, self.conn)57 # if result["st"]:58 # return True59 # else:60 # print(f"Failed to create VG {name} via LINSTOR")61 # return False62 # def show_unused_device(self):63 # """使用linstor命令展示可用的设备"""64 # cmd = "linstor ps l"65 # result = utils.exec_cmd(cmd, self.conn)66 # if result["st"]:67 # print(result["rt"])68 def get_lvm_device(self):69 """获取所有可见的LVM2设备"""70 cmd = "lvmdiskscan"71 result = utils.exec_cmd(cmd, self.conn)72 if result:73 if result["st"]:74 lvm_list = re.findall('(\S+)\s+\[\s*(\S+\s\w+)\]\s+', result["rt"])75 return lvm_list76 def get_filesys(self):77 """获取所有可见的LVM2设备"""78 cmd = "df"79 result = utils.exec_cmd(cmd, self.conn)80 if result:81 if result["st"]:82 fs_list = re.findall('(\S+)(?:\s+(?:\d+|-)){3}\s+\S+\s+\S\s*', result["rt"])83 return fs_list84 def get_pvs(self):85 """获取pv类型数据"""86 cmd = "pvs --noheadings"87 result = utils.exec_cmd(cmd, self.conn)88 if result:89 if result["st"]:90 vgs_list = re.findall('(\S+)\s+(\S*)\s+lvm2\s+\S+\s+(\S+)\s+(\S+)\s*?', result["rt"])91 # print(vgs_list)92 return vgs_list93 def get_vgs(self):94 """获取vg类型数据"""95 cmd = "vgs --noheadings"96 result = utils.exec_cmd(cmd, self.conn)97 if result:98 if result["st"]:99 vgs_list = re.findall('(\S+)\s+(\d+)\s+(\d+)\s+\d+\s+\S+\s+(\S+)\s+(\S+)\s*?', result["rt"])100 # print(vgs_list)101 return vgs_list102 def get_lvs(self):103 """获取lv类型数据"""104 cmd = "lvs --noheadings"105 result = utils.exec_cmd(cmd, self.conn)106 if result:107 if result["st"]:108 vgs_list = re.findall('(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s(\S*).*\s', result["rt"])109 # pprint.pprint(vgs_list)110 return vgs_list111 def create_pv(self, device):112 """创建pv"""113 cmd = f"pvcreate {device} -y"114 result = utils.exec_cmd(cmd, self.conn)115 if result["st"]:116 s.prt_log(f"Success in createing PV: {device}", 0)117 return True118 else:119 s.prt_log(f"Failed to create PV {device}", 1)120 return False121 def create_vg(self, name, list_pv):122 """创建vg"""123 pv = ' '.join(list_pv)124 cmd = f"vgcreate {name} {pv} -y"125 result = utils.exec_cmd(cmd, self.conn)126 if result["st"]:127 s.prt_log(f"Success in createing VG: {name}", 0)128 return True129 else:130 s.prt_log(f"Failed to create VG {name}", 1)131 return False132 def create_lv(self, name, size, vg):133 """创建lv"""134 cmd = f"lvcreate -n {name} -L {size} {vg} -y"135 result = utils.exec_cmd(cmd, self.conn)136 if result["st"]:137 s.prt_log(f"Success in createing LV: {name}", 0)138 return True139 else:140 s.prt_log(f"Failed to create LV: {name}", 1)141 return False142 def create_thinpool(self, name, size, vg):143 """创建thinpool"""144 cmd = f"lvcreate -T -L {size} {vg}/{name} -y"145 result = utils.exec_cmd(cmd, self.conn)146 if result["st"]:147 s.prt_log(f"Success in createing Thinpool: {name}", 0)148 return True149 else:150 s.prt_log(f"Failed to create Thinpool {name}", 1)151 return False152 def create_thinlv(self, name, size, vg, thinpool):153 """创建thinlv"""154 cmd = f"lvcreate -V {size} -n {name} {vg}/{thinpool} -y"155 result = utils.exec_cmd(cmd, self.conn)156 if result["st"]:157 s.prt_log(f"Success in createing Thin LV: {name}", 0)158 return True159 else:160 s.prt_log(f"Failed to create Thin LV {name}", 1)161 return False162 def del_pv(self, name):163 """删除PV"""164 cmd = f"pvremove {name} -y"165 result = utils.exec_cmd(cmd, self.conn)166 if result["st"]:167 s.prt_log(f"Success in deleting PV: {name}", 0)168 return True169 else:170 s.prt_log(f"Failed to delete PV {name}", 1)171 return False172 def del_vg(self, name):173 """删除VG,删除前确认vg中是否还有lv"""174 self.vg_list = self.get_vgs()175 for vg in self.vg_list:176 if vg[0] == name:177 if int(vg[2]) > 0:178 s.prt_log(f"{name} still have other lv resource. Cancel delete {name}.", 2)179 cmd = f"vgremove {name} -y"180 result = utils.exec_cmd(cmd, self.conn)181 if result["st"]:182 s.prt_log(f"Success in deleting VG: {name}", 0)183 return True184 else:185 s.prt_log(f"Failed to delete VG {name}", 1)186 return False187 def del_thinpool(self, vg, thinpool):188 """删除thinpool"""189 # lvremove /dev/linstor_vtel_pool/vtel_pool190 cmd = f"lvremove /dev/{vg}/{thinpool} -y"191 result = utils.exec_cmd(cmd, self.conn)192 if result["st"]:193 s.prt_log(f"Success in deleting Thinpool: {vg}/{thinpool}", 0)194 return True195 else:196 s.prt_log(f"Failed to delete Thinpool: {vg}/{thinpool}", 1)197 return False198 pass199 def check_thinpool(self, vg, thinpool):200 """201 检查Thinpool是不是被用作LINSTOR后端存储202 thinpool name: vg/poolname203 """204 if self.sp:205 for i in self.sp:206 if i['PoolName'] == f'{vg}/{thinpool}':207 return i['StoragePool']208 def check_lv(self, lv):209 """检查lv是不是LINSTOR资源"""210 if self.res:211 for i in self.res:212 if f'{i["Resource"]}_00000' == lv:213 return True214 def check_vg(self, vg):215 """检查vg是不是被用作LINSTOR后端存储"""216 if self.sp:217 for i in self.sp:218 if i["PoolName"] == vg:219 return i["StoragePool"]220 def check_vg_exit(self, vg_name):221 if self.vg_list:222 for vg in self.vg_list:223 if vg[0] == vg_name:224 return False225 return True226 def check_pv_exit(self, device_list):227 pv_in_use = []228 if self.pv_list:229 for pv in self.pv_list:230 if pv[0] in device_list:231 pv_in_use.append(pv[0])232 if pv_in_use:233 pv_in_use_str = ",".join(pv_in_use)234 s.prt_log(f'{pv_in_use_str} have been used to create PV.', 1)235 return False236 else:237 return True238 def get_pv_via_vg(self, vg):239 """通过VG名获取对应的PV"""240 pv_dict = {}241 if self.pv_list:242 for pv in self.pv_list:243 if pv[1] == vg:244 pv_dict[pv[0]] = pv[2]245 if not pv_dict:246 s.prt_log(f"{vg} is not vg resource", 2)247 return pv_dict248 def get_vg_via_thinpool(self, thinpool):249 """通过thinpool名获取对应的VG"""250 vg_list = []251 if self.lv_list:252 for lv in self.lv_list:253 if lv[0] == thinpool:254 vg_list.append(lv[1])255 return vg_list256 def get_vg_free_pe(self, vg):257 """获取vg中剩余的PE个数"""258 cmd = f"vgdisplay {vg}"259 result = utils.exec_cmd(cmd, self.conn)260 if result:261 if result["st"]:262 re_free_pe = re.search(r'Free\s*PE / Size\s*(\d+)', result["rt"])263 re_used_flag = re.search(r'Alloc\s*PE / Size\s*(\d+)', result["rt"])264 free_pe = int(re_free_pe.group(1))265 if int(re_used_flag.group(1)) == 0:266 free_pe = free_pe - 1267 return free_pe268 def get_device_size(self, device_list):269 size = 0270 lvm_list = self.get_lvm_device()271 for device in device_list:272 for lvm in lvm_list:273 if device == lvm[0]:274 size = size + size_conversion(lvm[1]) - 4275 return size276 def check_and_get_size(self, data, size, type):277 """278 计算创建thinpool的可用空间大小,与用户输入的大小值进行比较279 data: vg_name or list device280 size: the size that user input281 type: vg or device282 """283 real_size = 0284 if type == "vg":285 real_size = real_size + int(self.get_vg_free_pe(data)) * 4286 if type == "device":287 real_size = self.get_device_size(data)288 real_size = real_size - 4289 available_size = real_size - 4290 if available_size <= 0:291 s.prt_log("No available space.", 2)292 if size:293 size = size_conversion(size)294 if size <= available_size:295 return size296 else:297 s.prt_log(f"The size that you input is out of the actual available range (0,{available_size}M]", 2)298 else:299 return available_size300 def get_lvm_on_node(self):301 """将需要展示的数据处理成字典"""302 if not self.vg_list:303 s.prt_log("The node don't have VG.", 2)304 # pprint.pprint(self.res)305 # pprint.pprint(self.sp)306 vg_dict = {}307 for vg in self.vg_list:308 vg_data = {'total size': None, 'free size': None, 'linstor storage pool': {'sp name': None}}309 pv_key = f'pvs({vg[1]})'310 lv_key = f'lvs({vg[2]})'311 vg_data['total size'] = vg[3]312 vg_data['free size'] = vg[4]313 vg_data[pv_key] = None314 vg_data[lv_key] = None315 lv_dict = {}316 pv_dict = self.get_pv_via_vg(vg[0])317 vg_data[pv_key] = pv_dict if pv_dict else None318 if self.lv_list:319 for lv in self.lv_list:320 if lv[1] == vg[0]:321 # lv322 if '-wi-' in lv[2]:323 status = True if self.check_lv(lv[0]) else False324 lv_dict[lv[0]] = {"size": lv[3], 'linstor resource': status}325 # thinpool326 elif 'twi-' in lv[2]:327 sp_ = self.check_thinpool(lv[1], lv[0])328 lv_dict[lv[0]] = {"size": lv[3], 'linstor storage pool': {'sp name': sp_}}329 # elif 'Vwi-' in lv[2]:330 # lv_dict[lv[0]] = {"size": lv[3], 'linstor resource': False}331 vg_data[lv_key] = lv_dict if lv_dict else None332 for pool in lv_dict:333 pool_dict = {}334 # thinlv335 if self.lv_list:336 for lv in self.lv_list:337 if lv[4] == pool and lv[1] == vg[0]:338 thinlv_status = True if self.check_lv(lv[0]) else False339 pool_dict[lv[0]] = {"size": lv[3], 'linstor resource': thinlv_status}340 if len(pool_dict) != 0:341 pool_key = f'lvs({len(pool_dict)})'342 vg_data[lv_key][pool][pool_key] = pool_dict if pool_dict else None343 vg_dict[vg[0]] = vg_data344 vg_status = self.check_vg(vg[0])345 vg_data["linstor storage pool"]["sp name"] = vg_status346 return vg_dict347 def show_vg(self, vg=None):348 """以YAML格式展示JSON数据"""349 dict_vg = self.get_lvm_on_node()350 if not self.res and not self.sp:351 print('-' * 10, "Can't get LINSTOR resource", '-' * 10)352 try:353 if vg:354 print('-' * 15, vg, '-' * 15)355 s.prt_log(yaml.dump(dict_vg[vg], sort_keys=False), 0)356 else:357 s.prt_log(yaml.dump(dict_vg, sort_keys=False), 0)358 except KeyError:359 s.prt_log(f"{vg} does not exit.", 1)360 def show_unused_lvm_device(self):361 """表格展示未被用来创建PV的LVM设备"""362 print('-' * 15, "Unused Device", '-' * 15)363 list_header = ["Device", "Size"]364 lvm_list = self.get_lvm_device()365 fs_list = self.get_filesys()366 if lvm_list:367 unused_lvm_device = list(lvm_list)368 if self.pv_list:369 for pv in self.pv_list:370 for device in lvm_list:371 if pv[0] == device[0]:372 unused_lvm_device.remove(device)373 unused_lvm_device_without_fs = [i for i in unused_lvm_device if374 i[0] not in fs_list and "/dev/drbd" not in i[0] and "_00000" not in i[0]]375 s.prt_log(s.make_table(list_header, unused_lvm_device_without_fs), 0)376 else:377 s.prt_log(f"No message of unused device", 1)378 def delete_vg(self, vg):379 if not self.check_vg(vg):380 pv_dict = self.get_pv_via_vg(vg)381 if self.del_vg(vg):382 for pv in pv_dict.keys():383 self.del_pv(pv)384 else:385 s.prt_log(f"{vg} is in used", 1)386 def delete_thinpool(self, thinpool, confirm):387 vg_list = self.get_vg_via_thinpool(thinpool)388 if not vg_list:389 s.prt_log(f"{thinpool} is not thinpool resource", 2)390 if len(vg_list) > 1:391 print(f'Thinpool with the same name "{thinpool}" exist in those vg: {vg_list}')392 vg = input("Input the VG Name that the thinpool you want to delete:")393 if vg.strip() in vg_list:394 vg = vg.strip()395 else:396 s.prt_log("Error VG Name", 2)397 else:398 vg = vg_list[0]399 if not self.check_thinpool(vg, thinpool):400 if self.del_thinpool(vg, thinpool):401 pv_dict = self.get_pv_via_vg(vg)402 if confirm:403 if self.del_vg(vg):404 for pv in pv_dict.keys():405 self.del_pv(pv)406 else:...

Full Screen

Full Screen

lvm.py

Source:lvm.py Github

copy

Full Screen

...33 config.add_route('lv_snapshot', '/block/lvm/vg_list/{vg}/lv_list/{lv}/snapshot')34 config.add_route('pvmove', '/block/lvm/pvmove')35#curl -v -X GET http://192.168.1.123:6543/storlever/api/v1/block/lvm/vg_list36@get_view(route_name='vg_list')37def get_vg_list(request):38 lvm_mng = lvm.lvm_mgr()39 vgs = lvm_mng.get_all_vg()40 vg_dict = []41 for vg in vgs:42 vg_info = {43 'name':vgs[vg].name,44 'uuid':vgs[vg].uuid,45 'size':vgs[vg].size,46 'free_size':vgs[vg].free_size,47 }48 vg_dict.append(vg_info)49 return vg_dict50new_vg_schema = Schema({51 "vgname": StrRe(r"^([a-zA-Z].+)$"),...

Full Screen

Full Screen

vgdisplay.py

Source:vgdisplay.py Github

copy

Full Screen

1"""2VgDisplay - command ``vgdisplay``3=================================4"""5from .. import parser, CommandParser6import re7from insights.specs import Specs8@parser(Specs.vgdisplay)9class VgDisplay(CommandParser):10 """11 Parse the output of the ``vgdisplay -vv`` or ``vgdisplay`` commands.12 The ``vg_list`` property is the main access to the list of volume groups13 in the command output. Each volume group is stored as a dictionary of14 keys and values drawn from the property list of the volume group. The15 volume group's logical and physical volumes are stored in the 'Logical16 Volumes' and 'Physical Volumes' sub-keys, respectively.17 Sample command output of ``vgdisplay -vv`` (pruned for clarity)::18 Couldn't find device with uuid VVLmw8-e2AA-ECfW-wDPl-Vnaa-0wW1-utv7tV.19 --- Volume group ---20 VG Name RHEL7CSB21 System ID22 Format lvm223 Metadata Areas 124 Metadata Sequence No 1325 ...26 --- Logical volume ---27 LV Path /dev/RHEL7CSB/Root28 LV Name Root29 VG Name RHEL7CSB30 LV Size 29.30 GiB31 ...32 --- Physical volumes ---33 PV Name /dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd8419434 PV UUID EfWV9V-03CX-E6zc-JkMw-yQae-wdzp-Je1KUn35 PV Status allocatable36 Total PE / Free PE 118466 / 403637 Volume groups are kept in the ``vg_list`` property in the order they were38 found in the file.39 Lines containing 'Couldn't find device with uuid' and 'missing physical40 volume' are stored in a ``debug_info`` property.41 Examples:42 >>> vg_info = shared[VgDisplay]43 >>> len(vg_info.vg_list)44 145 >>> vgdata = vg_info.vg_list[0]46 >>> vgdata['VG Name']47 'RHEL7CSB'48 >>> vgdata['VG Size']49 '462.76 GiB'50 >>> 'Logical Volumes' in vgdata51 True52 >>> lvs = vgdata['Logical Volumes']53 >>> type(lvs)54 dict55 >>> lvs.keys() # Note - keyed by device name56 ['/dev/RHEL7CSB/Root']57 >>> lvs['/dev/RHEL7CSB/Root']['LV Name']58 'Root'59 >>> lvs['/dev/RHEL7CSB/Root']['LV Size']60 '29.30 GiB'61 >>> 'Physical Volumes' in vgdata62 True63 >>> vgdata['Physical Volumes'].keys()64 ['/dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd84194']65 >>> vgdata['Physical Volumes']['/dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd84194']['PV UUID']66 'EfWV9V-03CX-E6zc-JkMw-yQae-wdzp-Je1KUn'67 >>> vg_info.debug_info68 ["Couldn't find device with uuid VVLmw8-e2AA-ECfW-wDPl-Vnaa-0wW1-utv7tV."]69 """70 _FILTER_INFO = [71 "Couldn't find device with uuid",72 "physical volumes missing",73 ]74 def parse_content(self, content):75 self.vg_list = []76 self.debug_info = []77 # Each section starts with a '(VG|LV|PV) Name' header. This then78 # sets the amount of space between the start of the key and the start79 # of the value. Unfortunately, there may be only one space between80 # the key and the value and there are spaces in the key, so it's81 # impossible to tell where the key stops and the value ends on space82 # alone. However, the header sets the left-most column the value83 # starts at, so once we pick up the header we ignore the regex match84 # and get the key and value as substrings.85 header_re = re.compile(r'^(?P<key>VG Name|LV Path|PV Name)\s+(?P<val>\S.*)$')86 # To save different handling, we use this to refer to the current87 # dictionary we're storing data in, whether it be a VG, LV or PV.88 current_data_store = None89 value_start_column = 090 # Each section of key-value data starts with the header as above and91 # ends with a blank line. This prevents parsing of random error92 # and information messages as key-value pairs.93 in_keyval_section = False94 for line in content:95 line = line.strip()96 if any(debug in line for debug in self._FILTER_INFO):97 if line not in self.debug_info:98 self.debug_info.append(line)99 in_keyval_section = False100 match = header_re.match(line)101 # Headers mark the start of a keyval section, but they do match102 # inside, so only match if outside a keyval section.103 if not in_keyval_section and match:104 in_keyval_section = True105 key, value = match.group('key', 'val')106 # Determine column for start of value - remember, this string107 # is now stripped so this may look different from your examples108 value_start_column = line.index(value)109 if key == "VG Name":110 # New Volume Group - new vg_info_dict entry111 self.vg_list.append({112 # Logical and Physical volumes stored by name / device113 'Logical Volumes': {},114 'Physical Volumes': {},115 'VG Name': value,116 })117 current_data_store = self.vg_list[-1]118 elif key == 'LV Path':119 # New Logical Volume - append to 'Logical Volumes'120 current_data_store = {121 key: value,122 }123 self.vg_list[-1]['Logical Volumes'][value] = current_data_store124 else: # elif key == 'PV Name':125 # New Physical Volume - append to 'Physical Volumes'126 current_data_store = {127 key: value,128 }129 self.vg_list[-1]['Physical Volumes'][value] = current_data_store130 elif line == '':131 # Blank line = new section, stop expecting keys and values132 in_keyval_section = False133 elif in_keyval_section and len(line) > value_start_column:134 # Extract key and value by substring135 key = line[:value_start_column].strip()136 value = line[value_start_column:].strip()137 # Finally, add this key/val pair to the current data store....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful