Best Python code snippet using lisa_python
filesystem.py
Source:filesystem.py  
1#!/usr/bin/python2# -*- coding: utf-8 -*-3# Copyright: (c) 2013, Alexander Bulimov <lazywolf0@gmail.com>4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)5from __future__ import absolute_import, division, print_function6__metaclass__ = type7ANSIBLE_METADATA = {'metadata_version': '1.1',8                    'status': ['preview'],9                    'supported_by': 'community'}10DOCUMENTATION = '''11---12author:13- Alexander Bulimov (@abulimov)14module: filesystem15short_description: Makes a filesystem16description:17  - This module creates a filesystem.18version_added: "1.2"19options:20  fstype:21    choices: [ btrfs, ext2, ext3, ext4, ext4dev, f2fs, lvm, ocfs2, reiserfs, xfs, vfat, swap ]22    description:23    - Filesystem type to be created.24    - reiserfs support was added in 2.2.25    - lvm support was added in 2.5.26    - since 2.5, I(dev) can be an image file.27    - vfat support was added in 2.528    - ocfs2 support was added in 2.629    - f2fs support was added in 2.730    - swap support was added in 2.831    required: yes32    aliases: [type]33  dev:34    description:35    - Target path to device or image file.36    required: yes37    aliases: [device]38  force:39    description:40    - If C(yes), allows to create new filesystem on devices that already has filesystem.41    type: bool42    default: 'no'43  resizefs:44    description:45    - If C(yes), if the block device and filesystem size differ, grow the filesystem into the space.46    - Supported for C(ext2), C(ext3), C(ext4), C(ext4dev), C(f2fs), C(lvm), C(xfs), C(vfat), C(swap) filesystems.47    - XFS Will only grow if mounted.48    - vFAT will likely fail if fatresize < 1.04.49    type: bool50    default: 'no'51    version_added: "2.0"52  opts:53    description:54    - List of options to be passed to mkfs command.55requirements:56  - Uses tools related to the I(fstype) (C(mkfs)) and C(blkid) command. When I(resizefs) is enabled, C(blockdev) command is required too.57notes:58  - Potential filesystem on I(dev) are checked using C(blkid), in case C(blkid) isn't able to detect an existing filesystem,59    this filesystem is overwritten even if I(force) is C(no).60'''61EXAMPLES = '''62- name: Create a ext2 filesystem on /dev/sdb163  filesystem:64    fstype: ext265    dev: /dev/sdb166- name: Create a ext4 filesystem on /dev/sdb1 and check disk blocks67  filesystem:68    fstype: ext469    dev: /dev/sdb170    opts: -cc71'''72from distutils.version import LooseVersion73import os74import re75import stat76from ansible.module_utils.basic import AnsibleModule, get_platform77class Device(object):78    def __init__(self, module, path):79        self.module = module80        self.path = path81    def size(self):82        """ Return size in bytes of device. Returns int """83        statinfo = os.stat(self.path)84        if stat.S_ISBLK(statinfo.st_mode):85            blockdev_cmd = self.module.get_bin_path("blockdev", required=True)86            _, devsize_in_bytes, _ = self.module.run_command([blockdev_cmd, "--getsize64", self.path], check_rc=True)87            return int(devsize_in_bytes)88        elif os.path.isfile(self.path):89            return os.path.getsize(self.path)90        else:91            self.module.fail_json(changed=False, msg="Target device not supported: %s" % self)92    def __str__(self):93        return self.path94class Filesystem(object):95    GROW = None96    MKFS = None97    MKFS_FORCE_FLAGS = ''98    LANG_ENV = {'LANG': 'C', 'LC_ALL': 'C', 'LC_MESSAGES': 'C'}99    def __init__(self, module):100        self.module = module101    @property102    def fstype(self):103        return type(self).__name__104    def get_fs_size(self, dev):105        """ Return size in bytes of filesystem on device. Returns int """106        raise NotImplementedError()107    def create(self, opts, dev):108        if self.module.check_mode:109            return110        mkfs = self.module.get_bin_path(self.MKFS, required=True)111        if opts is None:112            cmd = "%s %s '%s'" % (mkfs, self.MKFS_FORCE_FLAGS, dev)113        else:114            cmd = "%s %s %s '%s'" % (mkfs, self.MKFS_FORCE_FLAGS, opts, dev)115        self.module.run_command(cmd, check_rc=True)116    def grow_cmd(self, dev):117        cmd = self.module.get_bin_path(self.GROW, required=True)118        return [cmd, str(dev)]119    def grow(self, dev):120        """Get dev and fs size and compare. Returns stdout of used command."""121        devsize_in_bytes = dev.size()122        try:123            fssize_in_bytes = self.get_fs_size(dev)124        except NotImplementedError:125            self.module.fail_json(changed=False, msg="module does not support resizing %s filesystem yet." % self.fstype)126        if not fssize_in_bytes < devsize_in_bytes:127            self.module.exit_json(changed=False, msg="%s filesystem is using the whole device %s" % (self.fstype, dev))128        elif self.module.check_mode:129            self.module.exit_json(changed=True, msg="Resizing filesystem %s on device %s" % (self.fstype, dev))130        else:131            _, out, _ = self.module.run_command(self.grow_cmd(dev), check_rc=True)132            return out133class Ext(Filesystem):134    MKFS_FORCE_FLAGS = '-F'135    GROW = 'resize2fs'136    def get_fs_size(self, dev):137        cmd = self.module.get_bin_path('tune2fs', required=True)138        # Get Block count and Block size139        _, size, _ = self.module.run_command([cmd, '-l', str(dev)], check_rc=True, environ_update=self.LANG_ENV)140        for line in size.splitlines():141            if 'Block count:' in line:142                block_count = int(line.split(':')[1].strip())143            elif 'Block size:' in line:144                block_size = int(line.split(':')[1].strip())145                return block_size * block_count146class Ext2(Ext):147    MKFS = 'mkfs.ext2'148class Ext3(Ext):149    MKFS = 'mkfs.ext3'150class Ext4(Ext):151    MKFS = 'mkfs.ext4'152class XFS(Filesystem):153    MKFS = 'mkfs.xfs'154    MKFS_FORCE_FLAGS = '-f'155    GROW = 'xfs_growfs'156    def get_fs_size(self, dev):157        cmd = self.module.get_bin_path('xfs_growfs', required=True)158        _, size, _ = self.module.run_command([cmd, '-n', str(dev)], check_rc=True, environ_update=self.LANG_ENV)159        for line in size.splitlines():160            col = line.split('=')161            if col[0].strip() == 'data':162                if col[1].strip() != 'bsize':163                    self.module.fail_json(msg='Unexpected output format from xfs_growfs (could not locate "bsize")')164                if col[2].split()[1] != 'blocks':165                    self.module.fail_json(msg='Unexpected output format from xfs_growfs (could not locate "blocks")')166                block_size = int(col[2].split()[0])167                block_count = int(col[3].split(',')[0])168                return block_size * block_count169class Reiserfs(Filesystem):170    MKFS = 'mkfs.reiserfs'171    MKFS_FORCE_FLAGS = '-f'172class Btrfs(Filesystem):173    MKFS = 'mkfs.btrfs'174    def __init__(self, module):175        super(Btrfs, self).__init__(module)176        _, stdout, stderr = self.module.run_command('%s --version' % self.MKFS, check_rc=True)177        match = re.search(r" v([0-9.]+)", stdout)178        if not match:179            # v0.20-rc1 use stderr180            match = re.search(r" v([0-9.]+)", stderr)181        if match:182            # v0.20-rc1 doesn't have --force parameter added in following version v3.12183            if LooseVersion(match.group(1)) >= LooseVersion('3.12'):184                self.MKFS_FORCE_FLAGS = '-f'185            else:186                self.MKFS_FORCE_FLAGS = ''187        else:188            # assume version is greater or equal to 3.12189            self.MKFS_FORCE_FLAGS = '-f'190            self.module.warn('Unable to identify mkfs.btrfs version (%r, %r)' % (stdout, stderr))191class Ocfs2(Filesystem):192    MKFS = 'mkfs.ocfs2'193    MKFS_FORCE_FLAGS = '-Fx'194class F2fs(Filesystem):195    MKFS = 'mkfs.f2fs'196    GROW = 'resize.f2fs'197    @property198    def MKFS_FORCE_FLAGS(self):199        mkfs = self.module.get_bin_path(self.MKFS, required=True)200        cmd = "%s %s" % (mkfs, os.devnull)201        _, out, _ = self.module.run_command(cmd, check_rc=False, environ_update=self.LANG_ENV)202        # Looking for "	F2FS-tools: mkfs.f2fs Ver: 1.10.0 (2018-01-30)"203        # mkfs.f2fs displays version since v1.2.0204        match = re.search(r"F2FS-tools: mkfs.f2fs Ver: ([0-9.]+) \(", out)205        if match is not None:206            # Since 1.9.0, mkfs.f2fs check overwrite before make filesystem207            # before that version -f switch wasn't used208            if LooseVersion(match.group(1)) >= LooseVersion('1.9.0'):209                return '-f'210        return ''211    def get_fs_size(self, dev):212        cmd = self.module.get_bin_path('dump.f2fs', required=True)213        # Get sector count and sector size214        _, dump, _ = self.module.run_command([cmd, str(dev)], check_rc=True, environ_update=self.LANG_ENV)215        sector_size = None216        sector_count = None217        for line in dump.splitlines():218            if 'Info: sector size = ' in line:219                # expected: 'Info: sector size = 512'220                sector_size = int(line.split()[4])221            elif 'Info: total FS sectors = ' in line:222                # expected: 'Info: total FS sectors = 102400 (50 MB)'223                sector_count = int(line.split()[5])224            if None not in (sector_size, sector_count):225                break226        else:227            self.module.warn("Unable to process dump.f2fs output '%s'", '\n'.join(dump))228            self.module.fail_json(msg="Unable to process dump.f2fs output for %s" % dev)229        return sector_size * sector_count230class VFAT(Filesystem):231    if get_platform() == 'FreeBSD':232        MKFS = "newfs_msdos"233    else:234        MKFS = 'mkfs.vfat'235    GROW = 'fatresize'236    def get_fs_size(self, dev):237        cmd = self.module.get_bin_path(self.GROW, required=True)238        _, output, _ = self.module.run_command([cmd, '--info', str(dev)], check_rc=True, environ_update=self.LANG_ENV)239        for line in output.splitlines()[1:]:240            param, value = line.split(':', 1)241            if param.strip() == 'Size':242                return int(value.strip())243        self.module.fail_json(msg="fatresize failed to provide filesystem size for %s" % dev)244    def grow_cmd(self, dev):245        cmd = self.module.get_bin_path(self.GROW)246        return [cmd, "-s", str(dev.size()), str(dev.path)]247class LVM(Filesystem):248    MKFS = 'pvcreate'249    MKFS_FORCE_FLAGS = '-f'250    GROW = 'pvresize'251    def get_fs_size(self, dev):252        cmd = self.module.get_bin_path('pvs', required=True)253        _, size, _ = self.module.run_command([cmd, '--noheadings', '-o', 'pv_size', '--units', 'b', '--nosuffix', str(dev)], check_rc=True)254        block_count = int(size)255        return block_count256class Swap(Filesystem):257    MKFS = 'mkswap'258    MKFS_FORCE_FLAGS = '-f'259FILESYSTEMS = {260    'ext2': Ext2,261    'ext3': Ext3,262    'ext4': Ext4,263    'ext4dev': Ext4,264    'f2fs': F2fs,265    'reiserfs': Reiserfs,266    'xfs': XFS,267    'btrfs': Btrfs,268    'vfat': VFAT,269    'ocfs2': Ocfs2,270    'LVM2_member': LVM,271    'swap': Swap,272}273def main():274    friendly_names = {275        'lvm': 'LVM2_member',276    }277    fstypes = set(FILESYSTEMS.keys()) - set(friendly_names.values()) | set(friendly_names.keys())278    # There is no "single command" to manipulate filesystems, so we map them all out and their options279    module = AnsibleModule(280        argument_spec=dict(281            fstype=dict(required=True, aliases=['type'],282                        choices=list(fstypes)),283            dev=dict(required=True, aliases=['device']),284            opts=dict(),285            force=dict(type='bool', default=False),286            resizefs=dict(type='bool', default=False),287        ),288        supports_check_mode=True,289    )290    dev = module.params['dev']291    fstype = module.params['fstype']292    opts = module.params['opts']293    force = module.params['force']294    resizefs = module.params['resizefs']295    if fstype in friendly_names:296        fstype = friendly_names[fstype]297    changed = False298    try:299        klass = FILESYSTEMS[fstype]300    except KeyError:301        module.fail_json(changed=False, msg="module does not support this filesystem (%s) yet." % fstype)302    if not os.path.exists(dev):303        module.fail_json(msg="Device %s not found." % dev)304    dev = Device(module, dev)305    cmd = module.get_bin_path('blkid', required=True)306    rc, raw_fs, err = module.run_command("%s -c /dev/null -o value -s TYPE %s" % (cmd, dev))307    # In case blkid isn't able to identify an existing filesystem, device is considered as empty,308    # then this existing filesystem would be overwritten even if force isn't enabled.309    fs = raw_fs.strip()310    filesystem = klass(module)311    same_fs = fs and FILESYSTEMS.get(fs) == FILESYSTEMS[fstype]312    if same_fs and not resizefs and not force:313        module.exit_json(changed=False)314    elif same_fs and resizefs:315        if not filesystem.GROW:316            module.fail_json(changed=False, msg="module does not support resizing %s filesystem yet." % fstype)317        out = filesystem.grow(dev)318        # Sadly there is no easy way to determine if this has changed. For now, just say "true" and move on.319        #  in the future, you would have to parse the output to determine this.320        #  thankfully, these are safe operations if no change is made.321        module.exit_json(changed=True, msg=out)322    elif fs and not force:323        module.fail_json(msg="'%s' is already used as %s, use force=yes to overwrite" % (dev, fs), rc=rc, err=err)324    # create fs325    filesystem.create(opts, dev)326    changed = True327    module.exit_json(changed=changed)328if __name__ == '__main__':...fs.py
Source:fs.py  
...220                                                         _DEFAULT_FILE_SYSTEM)221        extension = specified_fs222    return _get_hash_str(extension)[:7]223@nova.privsep.sys_admin_pctxt.entrypoint224def mkfs(fs, path, label=None):225    unprivileged_mkfs(fs, path, label=None)226# NOTE(mikal): this method is deliberately not wrapped in a privsep227# entrypoint. This is not for unit testing, there are some callers who do228# not require elevated permissions when calling this.229def unprivileged_mkfs(fs, path, label=None):230    """Format a file or block device231    :param fs: Filesystem type (examples include 'swap', 'ext3', 'ext4'232               'btrfs', etc.)233    :param path: Path to file or block device to format234    :param label: Volume label to use235    """236    if fs == 'swap':237        args = ['mkswap']238    else:239        args = ['mkfs', '-t', fs]240    # add -F to force no interactive execute on non-block device.241    if fs in ('ext3', 'ext4', 'ntfs'):242        args.extend(['-F'])243    if label:244        if fs in ('msdos', 'vfat'):245            label_opt = '-n'246        else:247            label_opt = '-L'248        args.extend([label_opt, label])249    args.append(path)250    processutils.execute(*args)251@nova.privsep.sys_admin_pctxt.entrypoint252def _inner_configurable_mkfs(os_type, fs_label, target):253    mkfs_command = (_MKFS_COMMAND.get(os_type, _DEFAULT_MKFS_COMMAND) or254                    '') % {'fs_label': fs_label, 'target': target}255    processutils.execute(*mkfs_command.split())256# NOTE(mikal): this method is deliberately not wrapped in a privsep entrypoint257def configurable_mkfs(os_type, fs_label, target, run_as_root,258                      default_ephemeral_format, specified_fs=None):259    # Format a file or block device using a user provided command for each260    # os type. If user has not provided any configuration, format type will261    # be used according to a default_ephemeral_format configuration or a262    # system default.263    global _MKFS_COMMAND264    global _DEFAULT_MKFS_COMMAND265    mkfs_command = (_MKFS_COMMAND.get(os_type, _DEFAULT_MKFS_COMMAND) or266                    '') % {'fs_label': fs_label, 'target': target}267    if mkfs_command:268        if run_as_root:269            _inner_configurable_mkfs(os_type, fs_label, target)270        else:271            processutils.execute(*mkfs_command.split())272    else:273        if not specified_fs:274            specified_fs = default_ephemeral_format275            if not specified_fs:276                specified_fs = _DEFAULT_FS_BY_OSTYPE.get(os_type,277                                                         _DEFAULT_FILE_SYSTEM)278        if run_as_root:279            mkfs(specified_fs, target, fs_label)280        else:...test_block_mkfs.py
Source:test_block_mkfs.py  
...120    @mock.patch("curtin.block.mkfs.os")121    def test_mkfs_kwargs(self, mock_os, mock_util, mock_block):122        """Ensure that kwargs are being followed"""123        mock_block.get_blockdev_sector_size.return_value = (512, 512)124        mkfs.mkfs("/dev/null", "ext4", [], uuid=self.test_uuid,125                  label="testlabel", force=True)126        expected_flags = ["-F", ["-L", "testlabel"], ["-U", self.test_uuid]]127        calls = mock_util.subp.call_args_list128        self.assertEquals(len(calls), 1)129        call = calls[0][0][0]130        self.assertEquals(call[0], "mkfs.ext4")131        self._assert_same_flags(call, expected_flags)132    @mock.patch("curtin.block.mkfs.os")133    def test_mkfs_invalid_block_device(self, mock_os):134        """Do not proceed if block device is none or is not valid block dev"""135        with self.assertRaises(ValueError):136            mock_os.path.exists.return_value = False137            mkfs.mkfs("/dev/null", "ext4")138        with self.assertRaises(ValueError):139            mock_os.path.exists.return_value = True140            mkfs.mkfs(None, "ext4")141    @mock.patch("curtin.block.mkfs.block")142    @mock.patch("curtin.block.mkfs.util")143    @mock.patch("curtin.block.mkfs.os")144    def test_mkfs_generates_uuid(self, mock_os, mock_util, mock_block):145        """Ensure that block.mkfs generates and returns a uuid if None is146           provided"""147        mock_block.get_blockdev_sector_size.return_value = (512, 512)148        uuid = mkfs.mkfs("/dev/null", "ext4")149        self.assertIsNotNone(uuid)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
