Best Python code snippet using avocado_python
lvm_operation.py
Source:lvm_operation.py  
1# -*- coding:utf-8 -*-2import os3import re4import yaml5import pprint6from execute.linstor_api import LinstorAPI7import utils8import sys9import sundry as s10def size_conversion(size_str):11    m = re.match(r'([0-9.]+)\s*(\D*)', size_str)12    try:13        size = float(m.group(1))14        unit_str = m.group(2)15        if unit_str in ["", "M", "MB", "MiB"]:16            pass17        elif unit_str in ["K", "KB", "KiB"]:18            size = size / 102419        elif unit_str in ["G", "GB", "GiB"]:20            size = size * 102421        elif unit_str in ["T", "TB", "TiB"]:22            size = size * 1024 * 102423        else:24            s.prt_log("Unit error of size!", 2)25    except AttributeError:26        s.prt_log("The size that you input is not a valid number", 2)27    return size28class ClusterLVM(object):29    def __init__(self, node):30        try:31            self.api = LinstorAPI()32            self.sp = self.api.get_storagepool([node])33            self.res = self.api.get_resource([node])34        except AttributeError:35            self.sp = None36            self.res = None37        if node == utils.get_hostname():38            self.conn = None39        else:40            self.conn = utils.SSHConn(node)41        self.pv_list = self.get_pvs()42        self.vg_list = self.get_vgs()43        self.lv_list = self.get_lvs()44    # def create_linstor_thinpool(self, name, node, list_pv):45    #     pv = ' '.join(list_pv)46    #     cmd = f"linstor ps cdp --pool-name {name} lvmthin {node} {pv}"47    #     result = utils.exec_cmd(cmd, self.conn)48    #     if result["st"]:49    #         return True50    #     else:51    #         print(f"Failed to create Thinpool {name} via LINSTOR")52    #         return False53    # def create_linstor_vg(self, name, node, list_pv):54    #     pv = ' '.join(list_pv)55    #     cmd = f"linstor ps cdp --pool-name {name} lvm {node} {pv}"56    #     result = utils.exec_cmd(cmd, self.conn)57    #     if result["st"]:58    #         return True59    #     else:60    #         print(f"Failed to create VG {name} via LINSTOR")61    #         return False62    # def show_unused_device(self):63    #     """使ç¨linstorå½ä»¤å±ç¤ºå¯ç¨ç设å¤"""64    #     cmd = "linstor ps l"65    #     result = utils.exec_cmd(cmd, self.conn)66    #     if result["st"]:67    #         print(result["rt"])68    def get_lvm_device(self):69        """è·åææå¯è§çLVM2设å¤"""70        cmd = "lvmdiskscan"71        result = utils.exec_cmd(cmd, self.conn)72        if result:73            if result["st"]:74                lvm_list = re.findall('(\S+)\s+\[\s*(\S+\s\w+)\]\s+', result["rt"])75                return lvm_list76    def get_filesys(self):77        """è·åææå¯è§çLVM2设å¤"""78        cmd = "df"79        result = utils.exec_cmd(cmd, self.conn)80        if result:81            if result["st"]:82                fs_list = re.findall('(\S+)(?:\s+(?:\d+|-)){3}\s+\S+\s+\S\s*', result["rt"])83                return fs_list84    def get_pvs(self):85        """è·åpvç±»åæ°æ®"""86        cmd = "pvs --noheadings"87        result = utils.exec_cmd(cmd, self.conn)88        if result:89            if result["st"]:90                vgs_list = re.findall('(\S+)\s+(\S*)\s+lvm2\s+\S+\s+(\S+)\s+(\S+)\s*?', result["rt"])91                # print(vgs_list)92                return vgs_list93    def get_vgs(self):94        """è·åvgç±»åæ°æ®"""95        cmd = "vgs --noheadings"96        result = utils.exec_cmd(cmd, self.conn)97        if result:98            if result["st"]:99                vgs_list = re.findall('(\S+)\s+(\d+)\s+(\d+)\s+\d+\s+\S+\s+(\S+)\s+(\S+)\s*?', result["rt"])100                # print(vgs_list)101                return vgs_list102    def get_lvs(self):103        """è·ålvç±»åæ°æ®"""104        cmd = "lvs --noheadings"105        result = utils.exec_cmd(cmd, self.conn)106        if result:107            if result["st"]:108                vgs_list = re.findall('(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s(\S*).*\s', result["rt"])109                # pprint.pprint(vgs_list)110                return vgs_list111    def create_pv(self, device):112        """å建pv"""113        cmd = f"pvcreate {device} -y"114        result = utils.exec_cmd(cmd, self.conn)115        if result["st"]:116            s.prt_log(f"Success in createing PV: {device}", 0)117            return True118        else:119            s.prt_log(f"Failed to create PV {device}", 1)120            return False121    def create_vg(self, name, list_pv):122        """å建vg"""123        pv = ' '.join(list_pv)124        cmd = f"vgcreate {name} {pv} -y"125        result = utils.exec_cmd(cmd, self.conn)126        if result["st"]:127            s.prt_log(f"Success in createing VG: {name}", 0)128            return True129        else:130            s.prt_log(f"Failed to create VG {name}", 1)131            return False132    def create_lv(self, name, size, vg):133        """å建lv"""134        cmd = f"lvcreate -n {name} -L {size} {vg} -y"135        result = utils.exec_cmd(cmd, self.conn)136        if result["st"]:137            s.prt_log(f"Success in createing LV: {name}", 0)138            return True139        else:140            s.prt_log(f"Failed to create LV: {name}", 1)141            return False142    def create_thinpool(self, name, size, vg):143        """å建thinpool"""144        cmd = f"lvcreate -T -L {size} {vg}/{name} -y"145        result = utils.exec_cmd(cmd, self.conn)146        if result["st"]:147            s.prt_log(f"Success in createing Thinpool: {name}", 0)148            return True149        else:150            s.prt_log(f"Failed to create Thinpool {name}", 1)151            return False152    def create_thinlv(self, name, size, vg, thinpool):153        """å建thinlv"""154        cmd = f"lvcreate -V {size} -n {name} {vg}/{thinpool} -y"155        result = utils.exec_cmd(cmd, self.conn)156        if result["st"]:157            s.prt_log(f"Success in createing Thin LV: {name}", 0)158            return True159        else:160            s.prt_log(f"Failed to create Thin LV {name}", 1)161            return False162    def del_pv(self, name):163        """å é¤PV"""164        cmd = f"pvremove {name} -y"165        result = utils.exec_cmd(cmd, self.conn)166        if result["st"]:167            s.prt_log(f"Success in deleting PV: {name}", 0)168            return True169        else:170            s.prt_log(f"Failed to delete PV {name}", 1)171            return False172    def del_vg(self, name):173        """å é¤VG,å é¤å确认vg䏿¯å¦è¿ælv"""174        self.vg_list = self.get_vgs()175        for vg in self.vg_list:176            if vg[0] == name:177                if int(vg[2]) > 0:178                    s.prt_log(f"{name} still have other lv resource. Cancel delete {name}.", 2)179        cmd = f"vgremove {name} -y"180        result = utils.exec_cmd(cmd, self.conn)181        if result["st"]:182            s.prt_log(f"Success in deleting VG: {name}", 0)183            return True184        else:185            s.prt_log(f"Failed to delete VG {name}", 1)186            return False187    def del_thinpool(self, vg, thinpool):188        """å é¤thinpool"""189        # lvremove /dev/linstor_vtel_pool/vtel_pool190        cmd = f"lvremove /dev/{vg}/{thinpool} -y"191        result = utils.exec_cmd(cmd, self.conn)192        if result["st"]:193            s.prt_log(f"Success in deleting Thinpool: {vg}/{thinpool}", 0)194            return True195        else:196            s.prt_log(f"Failed to delete Thinpool: {vg}/{thinpool}", 1)197            return False198        pass199    def check_thinpool(self, vg, thinpool):200        """201        æ£æ¥Thinpoolæ¯ä¸æ¯è¢«ç¨ä½LINSTORå端åå¨202        thinpool name: vg/poolname203        """204        if self.sp:205            for i in self.sp:206                if i['PoolName'] == f'{vg}/{thinpool}':207                    return i['StoragePool']208    def check_lv(self, lv):209        """æ£æ¥lvæ¯ä¸æ¯LINSTORèµæº"""210        if self.res:211            for i in self.res:212                if f'{i["Resource"]}_00000' == lv:213                    return True214    def check_vg(self, vg):215        """æ£æ¥vgæ¯ä¸æ¯è¢«ç¨ä½LINSTORå端åå¨"""216        if self.sp:217            for i in self.sp:218                if i["PoolName"] == vg:219                    return i["StoragePool"]220    def check_vg_exit(self, vg_name):221        if self.vg_list:222            for vg in self.vg_list:223                if vg[0] == vg_name:224                    return False225        return True226    def check_pv_exit(self, device_list):227        pv_in_use = []228        if self.pv_list:229            for pv in self.pv_list:230                if pv[0] in device_list:231                    pv_in_use.append(pv[0])232        if pv_in_use:233            pv_in_use_str = ",".join(pv_in_use)234            s.prt_log(f'{pv_in_use_str} have been used to create PV.', 1)235            return False236        else:237            return True238    def get_pv_via_vg(self, vg):239        """éè¿VGåè·å对åºçPV"""240        pv_dict = {}241        if self.pv_list:242            for pv in self.pv_list:243                if pv[1] == vg:244                    pv_dict[pv[0]] = pv[2]245        if not pv_dict:246            s.prt_log(f"{vg} is not vg resource", 2)247        return pv_dict248    def get_vg_via_thinpool(self, thinpool):249        """éè¿thinpoolåè·å对åºçVG"""250        vg_list = []251        if self.lv_list:252            for lv in self.lv_list:253                if lv[0] == thinpool:254                    vg_list.append(lv[1])255        return vg_list256    def get_vg_free_pe(self, vg):257        """è·åvgä¸å©ä½çPE个æ°"""258        cmd = f"vgdisplay {vg}"259        result = utils.exec_cmd(cmd, self.conn)260        if result:261            if result["st"]:262                re_free_pe = re.search(r'Free\s*PE / Size\s*(\d+)', result["rt"])263                re_used_flag = re.search(r'Alloc\s*PE / Size\s*(\d+)', result["rt"])264                free_pe = int(re_free_pe.group(1))265                if int(re_used_flag.group(1)) == 0:266                    free_pe = free_pe - 1267                return free_pe268    def get_device_size(self, device_list):269        size = 0270        lvm_list = self.get_lvm_device()271        for device in device_list:272            for lvm in lvm_list:273                if device == lvm[0]:274                    size = size + size_conversion(lvm[1]) - 4275        return size276    def check_and_get_size(self, data, size, type):277        """278        计ç®å建thinpoolçå¯ç¨ç©ºé´å¤§å°ï¼ä¸ç¨æ·è¾å
¥ç大å°å¼è¿è¡æ¯è¾279        data: vg_name or list device280        size: the size that user input281        type: vg or device282        """283        real_size = 0284        if type == "vg":285            real_size = real_size + int(self.get_vg_free_pe(data)) * 4286        if type == "device":287            real_size = self.get_device_size(data)288            real_size = real_size - 4289        available_size = real_size - 4290        if available_size <= 0:291            s.prt_log("No available space.", 2)292        if size:293            size = size_conversion(size)294            if size <= available_size:295                return size296            else:297                s.prt_log(f"The size that you input is out of the actual available range (0,{available_size}M]", 2)298        else:299            return available_size300    def get_lvm_on_node(self):301        """å°éè¦å±ç¤ºçæ°æ®å¤çæåå
¸"""302        if not self.vg_list:303            s.prt_log("The node don't have VG.", 2)304        # pprint.pprint(self.res)305        # pprint.pprint(self.sp)306        vg_dict = {}307        for vg in self.vg_list:308            vg_data = {'total size': None, 'free size': None, 'linstor storage pool': {'sp name': None}}309            pv_key = f'pvs({vg[1]})'310            lv_key = f'lvs({vg[2]})'311            vg_data['total size'] = vg[3]312            vg_data['free size'] = vg[4]313            vg_data[pv_key] = None314            vg_data[lv_key] = None315            lv_dict = {}316            pv_dict = self.get_pv_via_vg(vg[0])317            vg_data[pv_key] = pv_dict if pv_dict else None318            if self.lv_list:319                for lv in self.lv_list:320                    if lv[1] == vg[0]:321                        # lv322                        if '-wi-' in lv[2]:323                            status = True if self.check_lv(lv[0]) else False324                            lv_dict[lv[0]] = {"size": lv[3], 'linstor resource': status}325                        # thinpool326                        elif 'twi-' in lv[2]:327                            sp_ = self.check_thinpool(lv[1], lv[0])328                            lv_dict[lv[0]] = {"size": lv[3], 'linstor storage pool': {'sp name': sp_}}329                        # elif 'Vwi-' in lv[2]:330                        #     lv_dict[lv[0]] = {"size": lv[3], 'linstor resource': False}331            vg_data[lv_key] = lv_dict if lv_dict else None332            for pool in lv_dict:333                pool_dict = {}334                # thinlv335                if self.lv_list:336                    for lv in self.lv_list:337                        if lv[4] == pool and lv[1] == vg[0]:338                            thinlv_status = True if self.check_lv(lv[0]) else False339                            pool_dict[lv[0]] = {"size": lv[3], 'linstor resource': thinlv_status}340                if len(pool_dict) != 0:341                    pool_key = f'lvs({len(pool_dict)})'342                    vg_data[lv_key][pool][pool_key] = pool_dict if pool_dict else None343            vg_dict[vg[0]] = vg_data344            vg_status = self.check_vg(vg[0])345            vg_data["linstor storage pool"]["sp name"] = vg_status346        return vg_dict347    def show_vg(self, vg=None):348        """以YAMLæ ¼å¼å±ç¤ºJSONæ°æ®"""349        dict_vg = self.get_lvm_on_node()350        if not self.res and not self.sp:351            print('-' * 10, "Can't get LINSTOR resource", '-' * 10)352        try:353            if vg:354                print('-' * 15, vg, '-' * 15)355                s.prt_log(yaml.dump(dict_vg[vg], sort_keys=False), 0)356            else:357                s.prt_log(yaml.dump(dict_vg, sort_keys=False), 0)358        except KeyError:359            s.prt_log(f"{vg} does not exit.", 1)360    def show_unused_lvm_device(self):361        """表格å±ç¤ºæªè¢«ç¨æ¥å建PVçLVM设å¤"""362        print('-' * 15, "Unused Device", '-' * 15)363        list_header = ["Device", "Size"]364        lvm_list = self.get_lvm_device()365        fs_list = self.get_filesys()366        if lvm_list:367            unused_lvm_device = list(lvm_list)368            if self.pv_list:369                for pv in self.pv_list:370                    for device in lvm_list:371                        if pv[0] == device[0]:372                            unused_lvm_device.remove(device)373            unused_lvm_device_without_fs = [i for i in unused_lvm_device if374                                            i[0] not in fs_list and "/dev/drbd" not in i[0] and "_00000" not in i[0]]375            s.prt_log(s.make_table(list_header, unused_lvm_device_without_fs), 0)376        else:377            s.prt_log(f"No message of unused device", 1)378    def delete_vg(self, vg):379        if not self.check_vg(vg):380            pv_dict = self.get_pv_via_vg(vg)381            if self.del_vg(vg):382                for pv in pv_dict.keys():383                    self.del_pv(pv)384        else:385            s.prt_log(f"{vg} is in used", 1)386    def delete_thinpool(self, thinpool, confirm):387        vg_list = self.get_vg_via_thinpool(thinpool)388        if not vg_list:389            s.prt_log(f"{thinpool} is not thinpool resource", 2)390        if len(vg_list) > 1:391            print(f'Thinpool with the same name "{thinpool}" exist in those vg: {vg_list}')392            vg = input("Input the VG Name that the thinpool you want to delete:")393            if vg.strip() in vg_list:394                vg = vg.strip()395            else:396                s.prt_log("Error VG Name", 2)397        else:398            vg = vg_list[0]399        if not self.check_thinpool(vg, thinpool):400            if self.del_thinpool(vg, thinpool):401                pv_dict = self.get_pv_via_vg(vg)402                if confirm:403                    if self.del_vg(vg):404                        for pv in pv_dict.keys():405                            self.del_pv(pv)406        else:...lvm.py
Source:lvm.py  
...33    config.add_route('lv_snapshot', '/block/lvm/vg_list/{vg}/lv_list/{lv}/snapshot')34    config.add_route('pvmove', '/block/lvm/pvmove')35#curl -v -X GET http://192.168.1.123:6543/storlever/api/v1/block/lvm/vg_list36@get_view(route_name='vg_list')37def get_vg_list(request):38    lvm_mng = lvm.lvm_mgr()39    vgs = lvm_mng.get_all_vg()40    vg_dict = []41    for vg in vgs:42        vg_info = {43                   'name':vgs[vg].name,44                   'uuid':vgs[vg].uuid,45                   'size':vgs[vg].size,46                   'free_size':vgs[vg].free_size,47                   }48        vg_dict.append(vg_info)49    return vg_dict50new_vg_schema = Schema({51    "vgname": StrRe(r"^([a-zA-Z].+)$"),...vgdisplay.py
Source:vgdisplay.py  
1"""2VgDisplay - command ``vgdisplay``3=================================4"""5from .. import parser, CommandParser6import re7from insights.specs import Specs8@parser(Specs.vgdisplay)9class VgDisplay(CommandParser):10    """11    Parse the output of the ``vgdisplay -vv`` or ``vgdisplay`` commands.12    The ``vg_list`` property is the main access to the list of volume groups13    in the command output.  Each volume group is stored as a dictionary of14    keys and values drawn from the property list of the volume group.  The15    volume group's logical and physical volumes are stored in the 'Logical16    Volumes' and 'Physical Volumes' sub-keys, respectively.17    Sample command output of ``vgdisplay -vv`` (pruned for clarity)::18        Couldn't find device with uuid VVLmw8-e2AA-ECfW-wDPl-Vnaa-0wW1-utv7tV.19        --- Volume group ---20        VG Name               RHEL7CSB21        System ID22        Format                lvm223        Metadata Areas        124        Metadata Sequence No  1325        ...26        --- Logical volume ---27        LV Path                /dev/RHEL7CSB/Root28        LV Name                Root29        VG Name                RHEL7CSB30        LV Size                29.30 GiB31        ...32        --- Physical volumes ---33        PV Name               /dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd8419434        PV UUID               EfWV9V-03CX-E6zc-JkMw-yQae-wdzp-Je1KUn35        PV Status             allocatable36        Total PE / Free PE    118466 / 403637    Volume groups are kept in the ``vg_list`` property in the order they were38    found in the file.39    Lines containing 'Couldn't find device with uuid' and 'missing physical40    volume' are stored in a ``debug_info`` property.41    Examples:42        >>> vg_info = shared[VgDisplay]43        >>> len(vg_info.vg_list)44        145        >>> vgdata = vg_info.vg_list[0]46        >>> vgdata['VG Name']47        'RHEL7CSB'48        >>> vgdata['VG Size']49        '462.76 GiB'50        >>> 'Logical Volumes' in vgdata51        True52        >>> lvs = vgdata['Logical Volumes']53        >>> type(lvs)54        dict55        >>> lvs.keys()  # Note - keyed by device name56        ['/dev/RHEL7CSB/Root']57        >>> lvs['/dev/RHEL7CSB/Root']['LV Name']58        'Root'59        >>> lvs['/dev/RHEL7CSB/Root']['LV Size']60        '29.30 GiB'61        >>> 'Physical Volumes' in vgdata62        True63        >>> vgdata['Physical Volumes'].keys()64        ['/dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd84194']65        >>> vgdata['Physical Volumes']['/dev/mapper/luks-96c66446-77fd-4431-9508-f6912bd84194']['PV UUID']66        'EfWV9V-03CX-E6zc-JkMw-yQae-wdzp-Je1KUn'67        >>> vg_info.debug_info68        ["Couldn't find device with uuid VVLmw8-e2AA-ECfW-wDPl-Vnaa-0wW1-utv7tV."]69    """70    _FILTER_INFO = [71        "Couldn't find device with uuid",72        "physical volumes missing",73    ]74    def parse_content(self, content):75        self.vg_list = []76        self.debug_info = []77        # Each section starts with a '(VG|LV|PV) Name' header.  This then78        # sets the amount of space between the start of the key and the start79        # of the value.  Unfortunately, there may be only one space between80        # the key and the value and there are spaces in the key, so it's81        # impossible to tell where the key stops and the value ends on space82        # alone.  However, the header sets the left-most column the value83        # starts at, so once we pick up the header we ignore the regex match84        # and get the key and value as substrings.85        header_re = re.compile(r'^(?P<key>VG Name|LV Path|PV Name)\s+(?P<val>\S.*)$')86        # To save different handling, we use this to refer to the current87        # dictionary we're storing data in, whether it be a VG, LV or PV.88        current_data_store = None89        value_start_column = 090        # Each section of key-value data starts with the header as above and91        # ends with a blank line.  This prevents parsing of random error92        # and information messages as key-value pairs.93        in_keyval_section = False94        for line in content:95            line = line.strip()96            if any(debug in line for debug in self._FILTER_INFO):97                if line not in self.debug_info:98                    self.debug_info.append(line)99                in_keyval_section = False100            match = header_re.match(line)101            # Headers mark the start of a keyval section, but they do match102            # inside, so only match if outside a keyval section.103            if not in_keyval_section and match:104                in_keyval_section = True105                key, value = match.group('key', 'val')106                # Determine column for start of value - remember, this string107                # is now stripped so this may look different from your examples108                value_start_column = line.index(value)109                if key == "VG Name":110                    # New Volume Group - new vg_info_dict entry111                    self.vg_list.append({112                        # Logical and Physical volumes stored by name / device113                        'Logical Volumes': {},114                        'Physical Volumes': {},115                        'VG Name': value,116                    })117                    current_data_store = self.vg_list[-1]118                elif key == 'LV Path':119                    # New Logical Volume - append to 'Logical Volumes'120                    current_data_store = {121                        key: value,122                    }123                    self.vg_list[-1]['Logical Volumes'][value] = current_data_store124                else:  # elif key == 'PV Name':125                    # New Physical Volume - append to 'Physical Volumes'126                    current_data_store = {127                        key: value,128                    }129                    self.vg_list[-1]['Physical Volumes'][value] = current_data_store130            elif line == '':131                # Blank line = new section, stop expecting keys and values132                in_keyval_section = False133            elif in_keyval_section and len(line) > value_start_column:134                # Extract key and value by substring135                key = line[:value_start_column].strip()136                value = line[value_start_column:].strip()137                # Finally, add this key/val pair to the current data store....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
