Best Python code snippet using fMBT_python
createNFSView.py
Source:createNFSView.py  
1#!/usr/bin/env python2"""Create a Cohesity NFS View Using python"""3# import pyhesity wrapper module4from pyhesity import *5# command line arguments6import argparse7parser = argparse.ArgumentParser()8parser.add_argument('-v', '--vip', type=str, required=True)  # Cohesity cluster name or IP9parser.add_argument('-u', '--username', type=str, required=True)  # Cohesity Username10parser.add_argument('-d', '--domain', type=str, default='local')  # Cohesity User Domain11parser.add_argument('-n', '--viewname', type=str, required=True)  # name view to create12parser.add_argument('-s', '--storagedomain', type=str, default='DefaultStorageDomain')  # name of storage domain to use13parser.add_argument('-q', '--qospolicy', type=str, choices=['Backup Target Low', 'Backup Target High', 'TestAndDev High', 'TestAndDev Low'], default='TestAndDev High')  # qos policy14parser.add_argument('-w', '--whitelist', action='append', default=[])  # ip to whitelist15parser.add_argument('-l', '--quotalimit', type=int, default=None)  # quota limit16parser.add_argument('-a', '--quotaalert', type=int, default=None)  # quota alert threshold17parser.add_argument('-c', '--clearwhitelist', action='store_true')  # erase existing whitelist18parser.add_argument('-r', '--removewhitelistentries', action='store_true')  # remove whitelist entries specified with -w19parser.add_argument('-x', '--updateexistingview', action='store_true')  # allow update of existing view (otherwise exit if view exists)20parser.add_argument('-lm', '--lockmode', type=str, choices=['Compliance', 'Enterprise', 'None', 'compliance', 'enterprise', 'none'], default='None')  # datalock mode21parser.add_argument('-dl', '--defaultlockperiod', type=int, default=1)  # default lock period22parser.add_argument('-al', '--autolockminutes', type=int, default=0)  # autolock after idle minutes23parser.add_argument('-ml', '--minimumlockperiod', type=int, default=0)  # minimum manual lock period24parser.add_argument('-xl', '--maximumlockperiod', type=int, default=1)  # maximum manual lock period25parser.add_argument('-lt', '--manuallockmode', type=str, choices=['ReadOnly', 'FutureATime', 'readonly', 'futureatim'], default='ReadOnly')  # manual locking type26parser.add_argument('-lu', '--lockunit', type=str, choices=['minute', 'hour', 'day', 'minutes', 'hours', 'days'], default='minute')  # lock period units27args = parser.parse_args()28vip = args.vip29username = args.username30domain = args.domain31viewName = args.viewname32storageDomain = args.storagedomain33qosPolicy = args.qospolicy34whitelist = args.whitelist35quotalimit = args.quotalimit36quotaalert = args.quotaalert37removewhitelistentries = args.removewhitelistentries38clearwhitelist = args.clearwhitelist39updateexistingview = args.updateexistingview40lockmode = args.lockmode41defaultlockperiod = args.defaultlockperiod42autolockminutes = args.autolockminutes43minimumlockperiod = args.minimumlockperiod44maximumlockperiod = args.maximumlockperiod45manuallockmode = args.manuallockmode46lockunit = args.lockunit47lockunitmap = {'minute': 60000, 'minutes': 60000, 'hour': 3600000, 'hours': 3600000, 'day': 86400000, 'days': 86400000}48lockunitmultiplier = lockunitmap[lockunit]49# netmask2cidr50def netmask2cidr(netmask):51    bin = ''.join(["{0:b}".format(int(o)) for o in netmask.split('.')])52    if '0' in bin:53        cidr = bin.index('0')54    else:55        cidr = 3256    return cidr57# authenticate58apiauth(vip, username, domain)59existingview = None60views = api('get', 'views')61if views['count'] > 0:62    existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]63    if(len(existingviews) > 0):64        existingview = existingviews[0]65if existingview is not None and updateexistingview is not True:66    print('view %s already exists' % viewName)67    exit(0)68if existingview is None:69    # find storage domain70    sd = [sd for sd in api('get', 'viewBoxes') if sd['name'].lower() == storageDomain.lower()]71    if len(sd) != 1:72        print("Storage domain %s not found!" % storageDomain)73        exit()74    sdid = sd[0]['id']75    # new view parameters76    newView = {77        "caseInsensitiveNamesEnabled": True,78        "enableNfsViewDiscovery": True,79        "enableSmbAccessBasedEnumeration": False,80        "enableSmbViewDiscovery": True,81        "fileExtensionFilter": {82            "isEnabled": False,83            "mode": "kBlacklist",84            "fileExtensionsList": []85        },86        "protocolAccess": "kNFSOnly",87        "securityMode": "kNativeMode",88        "subnetWhitelist": [],89        "qos": {90            "principalName": qosPolicy91        },92        "name": viewName,93        "viewBoxId": sdid94    }95else:96    newView = existingview97if clearwhitelist is True:98    newView['subnetWhitelist'] = []99if len(whitelist) > 0:100    for ip in whitelist:101        if ',' in ip:102            (thisip, netmask) = ip.split(',')103            netmask = netmask.lstrip()104            cidr = netmask2cidr(netmask)105        else:106            thisip = ip107            netmask = '255.255.255.255'108            cidr = 32109        existingEntry = []110        if 'subnetWhitelist' in newView:111            existingEntry = [e for e in newView['subnetWhitelist'] if e['ip'] == thisip and e['netmaskBits'] == cidr]112        if removewhitelistentries is not True and len(existingEntry) == 0:113            newView['subnetWhitelist'].append({114                "description": '',115                "nfsAccess": "kReadWrite",116                "smbAccess": "kReadWrite",117                "nfsRootSquash": False,118                "ip": thisip,119                "netmaskIp4": netmask120            })121        else:122            if removewhitelistentries is True:123                newView['subnetWhitelist'] = [e for e in newView['subnetWhitelist'] if not (e['ip'] == thisip and e['netmaskBits'] == cidr)]124# apply quota125if quotalimit is not None:126    if quotaalert is None:127        quotaalert = quotalimit - (quotalimit / 10)128    quotalimit = quotalimit * (1024 * 1024 * 1024)129    quotaalert = quotaalert * (1024 * 1024 * 1024)130    newView['logicalQuota'] = {131        "hardLimitBytes": quotalimit,132        "alertLimitBytes": quotaalert133    }134# apply datalock135if lockmode.lower() != 'none':136    newView['fileLockConfig'] = {}137    if lockmode.lower() == 'enterprise':138        newView['fileLockConfig']['mode'] = "kEnterprise"139    if lockmode.lower() == 'compliance':140        newView['fileLockConfig']['mode'] = "kCompliance"141    if autolockminutes > 0:142        newView['fileLockConfig']['autoLockAfterDurationIdle'] = autolockminutes * 60000143    newView['fileLockConfig']['defaultFileRetentionDurationMsecs'] = defaultlockperiod * lockunitmultiplier144    if maximumlockperiod < defaultlockperiod:145        maximumlockperiod = defaultlockperiod146    if maximumlockperiod <= minimumlockperiod or defaultlockperiod <= minimumlockperiod:147        print("default and maximum lock periods must be greater than the minimum lock period")148        exit()149    minimumlockmsecs = minimumlockperiod * lockunitmultiplier150    if minimumlockmsecs == 0:151        minimumlockmsecs = 60000152    newView['fileLockConfig']['minRetentionDurationMsecs'] = minimumlockmsecs153    maximumlockmsecs = maximumlockperiod * lockunitmultiplier154    if maximumlockmsecs <= (minimumlockmsecs + 240000):155        maximumlockmsecs = minimumlockmsecs + 240000156    newView['fileLockConfig']['maxRetentionDurationMsecs'] = maximumlockmsecs157    if manuallockmode.lower() == 'readonly':158        newView['fileLockConfig']['lockingProtocol'] = 'kSetReadOnly'159    else:160        newView['fileLockConfig']['lockingProtocol'] = 'kSetAtime'161    newView['fileLockConfig']['expiryTimestampMsecs'] = 0162# update qos policy163newView['qos']['principalName'] = qosPolicy164# create the view165if existingview is None:166    print("Creating view %s..." % viewName)167    result = api('post', 'views', newView)168else:169    print("Updating view %s..." % viewName)...cloneBackupToView.py
Source:cloneBackupToView.py  
1#!/usr/bin/env python2"""Create a Cohesity NFS View Using python"""3# import pyhesity wrapper module4from pyhesity import *5from time import sleep6from datetime import datetime7# command line arguments8import argparse9parser = argparse.ArgumentParser()10parser.add_argument('-v', '--vip', type=str, required=True)  # Cohesity cluster name or IP11parser.add_argument('-u', '--username', type=str, required=True)  # Cohesity Username12parser.add_argument('-d', '--domain', type=str, default='local')  # Cohesity User Domain13parser.add_argument('-n', '--viewname', type=str, required=True)  # name view to create14parser.add_argument('-q', '--qospolicy', type=str, choices=['Backup Target Low', 'Backup Target High', 'TestAndDev High', 'TestAndDev Low'], default='TestAndDev High')  # qos policy15parser.add_argument('-w', '--whitelist', action='append', default=[])  # ip to whitelist16parser.add_argument('-x', '--deleteview', action='store_true')  # delete existing view17parser.add_argument('-j', '--jobname', type=str, default=None)  # name job to clone18parser.add_argument('-o', '--objectname', type=str, default=None)  # name job to clone19parser.add_argument('-a', '--allruns', action='store_true')  # delete existing view20args = parser.parse_args()21vip = args.vip22username = args.username23domain = args.domain24viewName = args.viewname25qosPolicy = args.qospolicy26whitelist = args.whitelist27deleteview = args.deleteview28jobname = args.jobname29objectname = args.objectname30allruns = args.allruns31# netmask2cidr32def netmask2cidr(netmask):33    bin = ''.join(["{0:b}".format(int(o)) for o in netmask.split('.')])34    if '0' in bin:35        cidr = bin.index('0')36    else:37        cidr = 3238    return cidr39# authenticate40apiauth(vip, username, domain)41if deleteview is not True:42    if jobname is None:43        print('-j, --jobname required!')44        exit(1)45    # get protection job46    job = [job for job in api('get', 'protectionJobs') if job['name'].lower() == jobname.lower()]47    if not job:48        print("Job '%s' not found" % jobname)49        exit(1)50    else:51        job = job[0]52        sdid = job['viewBoxId']53existingview = None54views = api('get', 'views')55if views['count'] > 0:56    existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]57    if(len(existingviews) > 0):58        existingview = existingviews[0]59if existingview is None and deleteview is not True:60    # new view parameters61    newView = {62        "caseInsensitiveNamesEnabled": True,63        "enableNfsViewDiscovery": True,64        "enableSmbAccessBasedEnumeration": False,65        "enableSmbViewDiscovery": True,66        "fileExtensionFilter": {67            "isEnabled": False,68            "mode": "kBlacklist",69            "fileExtensionsList": []70        },71        "protocolAccess": "kNFSOnly",72        "securityMode": "kNativeMode",73        "subnetWhitelist": [],74        "qos": {75            "principalName": qosPolicy76        },77        "name": viewName,78        "viewBoxId": sdid79    }80    if len(whitelist) > 0:81        for ip in whitelist:82            if ',' in ip:83                (thisip, netmask) = ip.split(',')84                netmask = netmask.lstrip()85                cidr = netmask2cidr(netmask)86            else:87                thisip = ip88                netmask = '255.255.255.255'89                cidr = 3290            newView['subnetWhitelist'].append({91                "description": '',92                "nfsAccess": "kReadWrite",93                "smbAccess": "kReadWrite",94                "nfsRootSquash": False,95                "ip": thisip,96                "netmaskIp4": netmask97            })98    print("Creating new view %s..." % viewName)99    result = api('post', 'views', newView)100    sleep(5)101    views = api('get', 'views')102    if views['count'] > 0:103        existingviews = [v for v in views['views'] if v['name'].lower() == viewName.lower()]104        if(len(existingviews) > 0):105            view = existingviews[0]106else:107    if deleteview is True:108        if existingview:109            print("Deleting view %s..." % viewName)110            result = api('delete', 'views/%s' % viewName)111        else:112            print("View %s does not exist" % viewName)113        exit(0)114    else:115        if existingview['viewBoxId'] != job['viewBoxId']:116            print('View and job must be in the same storage domain!')117            exit(1)118        print("Using existing view: %s" % viewName)119        view = existingview120successStates = ['kSuccess', 'kWarning']121# get runs122thisObjectFound = False123runs = [r for r in api('get', 'protectionRuns?jobId=%s' % job['id']) if r['backupRun']['snapshotsDeleted'] is False and r['backupRun']['status'] in successStates]124if len(runs) > 0:125    for run in runs:126        runType = run['backupRun']['runType'][1:]127        for sourceInfo in run['backupRun']['sourceBackupStatus']:128            thisObjectName = sourceInfo['source']['name']129            if objectname is None or thisObjectName.lower() == objectname.lower():130                if sourceInfo['status'] in successStates:131                    thisObjectFound = True132                    sourceView = sourceInfo['currentSnapshotInfo']['viewName']133                    if 'relativeSnapshotDirectory' in sourceInfo['currentSnapshotInfo']:134                        sourcePath = sourceInfo['currentSnapshotInfo']['relativeSnapshotDirectory']135                    else:136                        sourcePath = ''137                    starttimeString = datetime.strptime(usecsToDate(run['backupRun']['stats']['startTimeUsecs']), '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d_%H-%M-%S')138                    destinationPath = "%s-%s-%s" % (thisObjectName, starttimeString, runType)139                    CloneDirectoryParams = {140                        'destinationDirectoryName': destinationPath,141                        'destinationParentDirectoryPath': '/%s' % view['name'],142                        'sourceDirectoryPath': '/%s/%s' % (sourceView, sourcePath),143                    }144                    folderPath = "%s:/%s/%s" % (vip, viewName, destinationPath)145                    print("Cloning %s backup files to %s" % (thisObjectName, folderPath))146                    result = api('post', 'views/cloneDirectory', CloneDirectoryParams)147    if thisObjectFound is False:148        print('No runs found containing %s' % objectname)149else:150    print('No runs found for job %s' % jobname)...xmlrpc.py
Source:xmlrpc.py  
1#   Copyright (c) 2003-2007 Open Source Applications Foundation2#3#   Licensed under the Apache License, Version 2.0 (the "License");4#   you may not use this file except in compliance with the License.5#   You may obtain a copy of the License at6#7#       http://www.apache.org/licenses/LICENSE-2.08#9#   Unless required by applicable law or agreed to in writing, software10#   distributed under the License is distributed on an "AS IS" BASIS,11#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12#   See the License for the specific language governing permissions and13#   limitations under the License.14from application import schema15from osaf import pim16import datetime17from twisted.web import xmlrpc18from util import commandline19# For current( )20from osaf.views import detail21def setattr_withtype(item, attribute, stringValue):22    """23    A wrapper around setattr which correctly unserializes a string24    value into an appropriate attribute value25    """26    v = item.getAttributeAspect(attribute, 'type').makeValue(stringValue)27    return setattr(item, attribute, v)28def getServletView(repository, name=None):29    if name is None:30        name = "XMLRPC"31    for existingView in repository.views:32        if existingView.name == name:33            return existingView34    return repository.createView(name)35class XmlRpcResource(xmlrpc.XMLRPC):36    def render(self, request):37        'Only allow XML-RPC connections from localhost'38        if request.getClientIP() == '127.0.0.1':39            return xmlrpc.XMLRPC.render(self, request)40        else:41            return xmlrpc.Fault(self.FAILURE, "Not localhost")42    def xmlrpc_echo(self, text):43        return "I received: '%s'" % text44    def xmlrpc_current(self):45        view = self.repositoryView46        item = None47        for block in detail.DetailRootBlock.iterItems(view=view):48            if hasattr(block, 'widget'):49                item = getattr(block, 'contents', None)50                break51        if item is not None:52            result = item.displayName53        else:54            result = "nothing selected"55        return result56    def xmlrpc_commandline(self, text, viewName=None):57        view = getServletView(self.repositoryView.repository, viewName)58        view.refresh()59        commandline.execute_command(view, text)60        view.commit()61        return "OK" # ???62    def xmlrpc_note(self, title, body, viewName=None):63        view = getServletView(self.repositoryView.repository, viewName)64        view.refresh()65        note = pim.Note(itsView=view, displayName=title, body=body)66        event = pim.EventStamp(note)67        event.add()68        event.startTime = datetime.datetime.now(tz=view.tzinfo.floating)69        event.duration = datetime.timedelta(minutes=60)70        event.anyTime = False71        allCollection = schema.ns('osaf.pim', view).allCollection72        allCollection.add(note)73        view.commit()74        return "OK" # ???75    #76    # Generic repository API77    #78    def generic_item_call(self, viewName, method, objectPath, *args, **kwds):79        view = getServletView(self.repositoryView.repository, viewName)80        view.refresh()81        try:82            item = view.findPath(objectPath)83            result = method(item, *args, **kwds)84        except Exception, e:85            print "error in generic_item_call: %s" % e86            raise87        # ugh, XML-RPC doesn't like None as a result88        if result is None:89            return True90        return result91    def xmlrpc_setAttribute(self, viewName, objectPath, attrName, value):92        """93        generic setAttribute - assumes the value is a atomic value -94        i.e. a string or an integer or something95        """96        return self.generic_item_call(viewName, setattr_withtype, objectPath, attrName, value)97    def xmlrpc_getAttribute(self, viewName, objectPath, attrName, value):98        """99        generic getAttribute - assumes the resulting value will be an100        atomic value like a string or an integer101        """102        return self.generic_item_call(viewName, getattr, objectPath, attrName, value)103    def xmlrpc_delAttribute(self, viewName, objectPath, attrName):104        """105        removes an attribute from an object106        """107        return self.generic_item_call(viewName, delattr, objectPath, attrName)108    def xmlrpc_commit(self, viewName=None):109        """110        commits the xml-rpc view111        """112        view = getServletView(self.repositoryView.repository, viewName)113        view.commit()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
