Best Python code snippet using avocado_python
virtBackup.py
Source:virtBackup.py  
1'''2Created on Apr 10, 20133The python script have been developed to:4  1. Perform a backup the last Sunday on the month5  2. Notify to staff about the backup and estimation time of the down services 3 days before (Thursdays)6  3. To Perform all the backups using libvirt and scp command.7After a backup the result will be:8  1. One new folder with the domain server name9  2. Inside each folder, we will have each virtual disk and the .xml file with the hypervisor configuration10Example crontab:11#Full VPN backup using libvirt.1200 09 * * * /usr/bin/python /opt/app/scripts/virBackup.py >> /dev/null13Depencencies:14apt-get install python-libvirt15'''16import libvirt17import syslog18from xml import dom19import subprocess20from os import path21from re import findall22from time import sleep23import os24import datetime as dt25import sys26LIB_VIRT_ERROR_EMAIL_LIST=['example@gexample.com']27NOTIFY_BACKUP_EMAIL_LIST=['example@gexample.com']28try:29    from operator import methodcaller30except ImportError:31    def methodcaller(name, *args, **kwargs):32        def caller(obj):33            return getattr(obj, name)(*args, **kwargs)34        return caller35class Domfetcher(object):36    """Abstract libvirt API, supply methods to return dom object lists"""37    def __init__(self, SASL_USER, SASL_PASS, uri = None):38        """Connect to hypervisor uri with read write access"""39        # register logit as the error handler40        41        libvirt.registerErrorHandler( logit, 'libvirt error' ) 42    43        try:44            #Without auht45            #self.c = libvirt.open( uri )46            47            #With auth (SASL2)48            self.auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], Domfetcher.request_cred, None]49            self.c = libvirt.openAuth(uri, self.auth, 0)50            51        except:52            self.c = None53            logit( "libvirt error", "Failed to open connection to the hypervisor" )54      55    @staticmethod      56    def request_cred(credentials, user_data):57        for credential in credentials:58            if credential[0] == libvirt.VIR_CRED_AUTHNAME:59                credential[4] = SASL_USER60            elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:61                credential[4] = SASL_PASS62        return 063            64    def get_all_doms(self):65        """Return a list of running and shutoff dom objects"""66        doms = self.get_running_doms()67        for name in self.get_shutoff_doms():68            doms.append(name)69        return doms70    def get_running_doms(self):71        """Return a list of running dom objects"""72        doms = []73        for id in self.c.listDomainsID(): # loop over the running ids74            dom = self.c.lookupByID( id ) # fetch dom object by id75            if 'Domain-' not in dom.name(): # prevent actions on the Xen hypervisor76                doms.append( dom ) # append dom object to doms77        return doms78    79    def get_shutoff_doms(self):80        """Return a list of all shutoff but defined dom objects"""81        return [self.c.lookupByName( name ) for name in self.c.listDefinedDomains()]82    83    def get_backup_dom(self, dom_to_backup):84        """Accept a list of dom, and return Return a list of running dom objects"""85        """The return is order list by the priority number"""86        dom=None87        try:88            print "WE ADD TO THE LIST: " + dom_to_backup[0]89            dom = self.c.lookupByName( dom_to_backup[0] )90        except:91            logit( "libvirt error", "Failed to open a dom. Review the dom backup list" )92            93 94        return dom95            96    def get_disk_size(self, dom ):97        """Accept a  dom objects, return int with the size disk in MB"""98        xml = dom.XMLDesc( 0 )99        total_size=0100        for disk_path in findall( "<source file='(.*)'/>\n", xml ):101            try:102                #Size in GB103                total_size=total_size + int(self.c.storageVolLookupByPath(disk_path).info()[2]/1024/1024)104            except:105                logit( "libvirt error", "Failed to open one disk when getting size, please add this disk into the POOL " + disk_path)106        107        return total_size108    109    '''110    conn=libvirt.open("qemu:///system")111    list_storage_pool=conn.listStoragePools()112    pool_obj = conn.storagePoolLookupByName('default')113    disk_obj=pool_obj.storageVolLookupByName('test1.img')114    int(pool_obj.storageVolLookupByName('test1.img').info()[2]/1024/1024)115    '''116        117def invoke( dom, method ):118    """Pattern to invoke shutdown, destroy, and start on a  doms"""119    f = methodcaller( method )120    try:121        logit( method, 'invoking %s on %s' % (method, dom.name()) )122        retcode = f(dom)123        if retcode: # log retcode124            logit( 125                  method,126                  '{0} returned {1} on {2}'.format( method, retcode, dom.name() )127                ) 128    except libvirt.libvirtError:129        pass130        131        132def logit( context, message, quiet = False ):133    """syslog and error handler"""134    135    #Only if there is a error we send the email136    if context == "libvirt error":137        mail(message, context, LIB_VIRT_ERROR_EMAIL_LIST)138    139    if type( message ) is tuple:140        message = message[2] # libvirt message is a tuple141    if quiet: pass 142    else: print context + ': ' +  message143    syslog.openlog( 'virt-back', 0, syslog.LOG_LOCAL3 )144    syslog.syslog( message )145    syslog.closelog()146    147    148        149def info( doms ):150    """Accept a list of dom objects, attempt to display info for all"""151    if check_all_running( doms ): print "NOTE: All guests are running"152    if check_all_shutoff( doms ): print "NOTE: All guests are shut off"153    154    155    156    print ''157    print 'running guests: ' + ', '.join( [ dom.name() for dom in get_all_running( doms ) ] )158    print 'shutoff guests: ' + ', '.join( [ dom.name() for dom in get_all_shutoff( doms ) ] )159    print ''160    print 'DomName'.ljust(16) + 'Memory MB'.rjust(12) + 'vCPUs'.rjust(8) + 'CPUtime ms'.rjust(18) + 'State'.rjust(20)161    print '=========================================================================='162    for dom in doms:163        name = dom.name()164        rams = str(dom.info()[2]/1024) + '/' + str(dom.info()[1]/1024)165        cpus = str(dom.info()[3])166        time = str(dom.info()[4]/1000000)167        state = get_status(dom)168        print name.ljust(16) + rams.rjust(12) + cpus.rjust(8) + time.rjust(18) + state.rjust(20)169def check_all_running( doms ):170    """Accept a list of dom objects, check if all guest dom are active"""171    if sum( [dom.isActive() for dom in doms] ) == len( doms ):172        return True173    return False174def check_all_shutoff( doms ):175    """Accept a list of dom objects, check if all guest dom are shut off"""176    if sum( [dom.isActive() for dom in doms] ):177        return False178    return True179    180def get_status( dom ):181    """Accept a dom object, return a string with the current domain status"""182    states = {183        libvirt.VIR_DOMAIN_NOSTATE: 'no state',184        libvirt.VIR_DOMAIN_RUNNING: 'running',185        libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource',186        libvirt.VIR_DOMAIN_PAUSED: 'paused',187        libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down',188        libvirt.VIR_DOMAIN_SHUTOFF: 'shut off',189        libvirt.VIR_DOMAIN_CRASHED: 'crashed',190    }191    192    return states.get(dom.info()[0])193def isPause( dom ):194    """Accept a dom objects, check if the guest dom is pause. True=Pause"""195    if dom.info()[0] != 3:196        return False197    return True198def isRunning( dom ):199    """Accept a dom objects, check if the guest dom is Running. True=Pause"""200    if dom.info()[0] != 1:201        return False202    return True203def get_all_running( doms ):204    """Accept a list of dom objects, return a list of running dom objects"""205    return [ dom for dom in doms if dom.isActive() ]206def get_all_shutoff( doms ):207    """Accept a list of dom objects, return a list of shutoff dom objects"""208    return [ dom for dom in doms if not dom.isActive() ]209def scp(source, server, path = ""):210    """Transfer a source file from the host to local"""211    try:212        subprocess.Popen(["scp", "%s:%s" % (server, source), path+'/']).wait()213    except Exception:214        logit( "libvirt error", "Failed coping the dom" )215        return False216    217    return True218def is_last_sun_of_month():219    today = dt.date.today()220    if get_day_backup() == today:221        return True222    223    return False224        225def get_day_backup(month=None):226    '''Get day backup for the current month. Accept a int month. Return a object date.'''227    '''Day backup is the last Sunday of the month'''228    today = dt.date.today()229    sunday=7230    if month != None:231        next_month = dt.date(today.year, month, 15) + dt.timedelta(days=31)232    else:233        next_month = dt.date(today.year, today.month, 15) + dt.timedelta(days=31)234        235    current_month = dt.date(next_month.year, next_month.month, 1) - dt.timedelta(days=1)236    237    #Sunday = 7... Monday = 1. If the last day of the month is Monday, one day less is Sunday. This is the last Sunday on the month238    if current_month.isoweekday() != sunday:239        days_to_rest=current_month.isoweekday()240        last_sunday_month=current_month - dt.timedelta(days=days_to_rest)241        242    else:243        last_sunday_month=current_month244        245    return last_sunday_month246    247        248def show_calendar_backup():249    '''Print the calendar backup for the current year'''250    today = dt.date.today()251    print "CALENDAR FOR "+ str(today.year)252    for month in 1,2,3,4,5,6,7,8,9,10,11,12:253        print get_day_backup(month)254        255def get_calendar_backup():256    '''return a lis calendar backup for the current year'''257    calendar=[]258    for month in 1,2,3,4,5,6,7,8,9,10,11,12:259        calendar.append(str(get_day_backup(month)))260    return calendar261def mail(txt, subject, to):262    '''Acepts String, String, List'''263    import smtplib264    sender = 'example@gexample.com'265    receivers = to266    267    message = """From: From Operations <example@example.com>268To: To Developers <example@gexample.com269Subject: %s270%s.271""" % (subject, txt)272    try:273        smtpObj = smtplib.SMTP('smtp.example.com')274        smtpObj.sendmail(sender, receivers, message)         275    except Exception:276        logit( "libvirt error", "Failed to send a email" )277    278def backup ( dom, backpath, host ):279    280    before_status=''281    #if dom is running, we paused to make the backup282    if isRunning(dom):283        invoke(dom, 'suspend')284        before_status="running"285        while True:286           if isPause(dom):287               print get_status(dom)288               break289        290        291    #Update the new backpath for each dom292    backpath_dom=backpath+'/'+dom.name()293    294    #we backup the XML file to have the configuration295    if not os.path.exists(backpath_dom):296        try:297            os.makedirs(backpath_dom)298        except:299                logit( "libvirt error", "Failed to create a folder. We skip backup for: " + dom.name() )300                301                302    xml = dom.XMLDesc( 0 )303    304    """305    How to parse the XML. WORKS306    from xml.dom.minidom import parse, parseString307    xml_parse = parseString(xml)308    dom_name=xml_parse.getElementsByTagName("name")[0].firstChild.data309    """310    311    xmlfile = path.join( backpath_dom, dom.name() + '.xml' )312    f = open( xmlfile, 'w')313    f.write( xml )314    f.close()315    disklist = findall( "<source file='(.*)'/>\n", xml )316    317    #We have to copy all the disk for the specific domain318    for disk in disklist:319        scp(disk, host, backpath_dom)320        print "BACKING UP!!!"321        print disk +' '+ host +' '+ backpath_dom322        323    sleep(2)324    #Only resume if the dom was running325    if before_status == 'running':326        invoke(dom, 'resume')327        328        329def is_disk_mounted ( DISK_UUID, backpath ):330    '''IN case the disk is not ready we exit the script'''331    if not os.path.ismount(backpath):332      status_code = subprocess.Popen(["mount", "-U", DISK_UUID, backpath]).wait()333      if status_code != 0:334	logit( "libvirt error", "Failed, DISK NOT MOUNTED AND NOT POSSIBLE TO MOUNT" )335	sys.exit(2)336    337if __name__ == '__main__':  338    339    SASL_USER = "user"340    SASL_PASS = "password"341    SPEED_NET = 25 #MB. Just to take time statistics342    backpath='/media/VPS_BACKUPs'343    344    # Used to check if the disk exist345    DISK_UUID="" # for example, blkid /dev/sda1346    347    #Fisrt thing we do is to check if the disk is ready:348    is_disk_mounted ( DISK_UUID, backpath )349    350    #Setup how many days before the backup the system will send a reminder by email351    #The thurday before we send a recordatory email. 3 days before352    REMINDER_DAYS_BACKUP = 3353    354    doms_to_backup = [355    # You can have tuples in this format:356    # [Name, Host, Order, services]357        ["database.example.com", "10.2.4.69", 1, "Mysql"],358        ["web.example.com", "10.2.4.69", 1, "project1, project2"],359        ["net.example.com", "10.2.4.69", 0, "DNS, DHCP"],360    ]361    362   363    current_connections={}364    #We backup on Sundays365    if is_last_sun_of_month():366        367        for dom_to_backup in sorted(doms_to_backup, key=lambda dom: dom[2]):368            host=dom_to_backup[1]369            print "Connecting to: " + host370            print "Backing up: " + dom_to_backup[0]371            372            if not current_connections.has_key(host):373                print "We create connecton to: " + host374                host_conn=Domfetcher(SASL_USER, SASL_PASS, 'qemu+tcp://'+host+'/system')375                current_connections.setdefault(host, host_conn)376                377            host_conn=current_connections.get(host)378            dom=host_conn.get_backup_dom(dom_to_backup)379            380            381            #We execute the action we want per each dom382            if dom != None:383                dom_size = host_conn.get_disk_size(dom)384                print "Size to backup: " + str(dom_size) + " MB"385                stimation_time = dom_size / SPEED_NET / 60386                print "Estiamtion time: " + str(stimation_time) + " minutes"387                #backup(dom, backpath, host)388    else:389        days_to_backup=get_day_backup().day-dt.date.today().day390        from datetime import timedelta391        if days_to_backup == REMINDER_DAYS_BACKUP:392            txt="This is auto email from BACKUP SYSTEM. This sunday all servers will be buckup.\393            Please find below more into and time estimation time: \n"394            now_hour=dt.datetime.now()395            count_hours=now_hour396            for dom_to_backup in sorted(doms_to_backup, key=lambda dom: dom[2]):397                host=dom_to_backup[1]398                if not current_connections.has_key(host):399                    print "We create connecton to: " + host400                    host_conn=Domfetcher(SASL_USER, SASL_PASS, 'qemu+tcp://'+host+'/system')401                    current_connections.setdefault(host, host_conn)402                    403                host_conn=current_connections.get(host)404                dom=host_conn.get_backup_dom(dom_to_backup)405                406                if dom != None:407                    dom_time = host_conn.get_disk_size(dom) / SPEED_NET / 60 #in minutes408                    count_hours=count_hours+timedelta(minutes=dom_time) + timedelta(minutes = 5) #We add 5 extra minutes409                    #dom_time + 0.05 #5 extra minutes 410                    txt+="\n"411                    txt+="Server: " + dom_to_backup[0] + "\n"\412                    "Estimation time for the down time: FROM " + now_hour.strftime("%H:%M") +' TO '+ count_hours.strftime("%H:%M") + " hours." + "\n"\413                    "Services will be affected: "+ dom_to_backup[3] 414                    txt+="\n"415                    now_hour=count_hours416                    417            subject="PERFORME VPS BACKUPS"418            txt+="\n\n BACKUP CALENDAR FOR " +str(dt.datetime.today().year)+ ":\n"419            txt+="\n".join( str(x) for x in get_calendar_backup())420            421            mail(txt, subject, NOTIFY_BACKUP_EMAIL_LIST)...tiobench.py
Source:tiobench.py  
...226            cmd = "umount %s" % l_disk227            cmd1 = 'umount /dev/mapper/avocado_vg-avocado_lv'228            process.system(cmd, shell=True, ignore_status=True)229            process.system(cmd1, shell=True, ignore_status=True)230            if disk.is_disk_mounted(l_disk):231                return False232            return True233        def is_dir_unmounted():234            cmd = 'umount %s' % self.dir235            process.system(cmd, shell=True, ignore_status=True)236            if disk.is_dir_mounted(self.dir):237                return False238            return True239        self.log.info("checking if disk is mounted.")240        if disk.is_disk_mounted(l_disk):241            self.log.info("%s is mounted, unmounting it ....", l_disk)242            if wait.wait_for(is_disk_unmounted, timeout=10):243                self.log.info("%s unmounted successfully" % l_disk)244            else:245                self.err_mesg.append("%s unmount failed", l_disk)246        else:247            self.log.info("disk %s not mounted." % l_disk)248        self.log.info("checking if dir %s is mounted." % self.dir)249        if disk.is_dir_mounted(self.dir):250            self.log.info("%s is mounted, unmounting it ....", self.dir)251            if wait.wait_for(is_dir_unmounted, timeout=10):252                self.log.info("%s unmounted successfully" % self.dir)253            else:254                self.err_mesg.append("failed to unount %s", self.dir)...add_share.py
Source:add_share.py  
...38						print "<script>location.href = 'iframe_nas_settings.py';</script>";39				else:40					sharepathsplit = share_path.split('/');41					disk = sharepathsplit[0];42					checkmount = cli_utils.is_disk_mounted(disk);43					if (checkmount['id'] > 0):44						print "<script>alert('The disk [%s] is not mounted. Please run the [Rescan Volumes], [Remount Volumes] from [TOOLS] option. Could not create share!');</script>" % disk;45						print "<script>parent.location.href = 'main.py?page=nas';</script>";46					else:47						shares_dic = {'name': '', 'path': '', 'comment': ''};48						shares_dic['name']       = share_name;49						shares_dic['path']       = share_path;50						shares_dic['comment']    = comment;51						check_dir_exist = tools.is_dir_exist("/storage/"+shares_dic['path'])52						if(check_dir_exist == True):53							shares_dic['create_dir'] = False54						else:55							shares_dic['create_dir'] = True56							...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
