How to use is_disk_mounted method in avocado

Best Python code snippet using avocado_python

virtBackup.py

Source:virtBackup.py Github

copy

Full Screen

1'''2Created on Apr 10, 20133The python script have been developed to:4 1. Perform a backup the last Sunday on the month5 2. Notify to staff about the backup and estimation time of the down services 3 days before (Thursdays)6 3. To Perform all the backups using libvirt and scp command.7After a backup the result will be:8 1. One new folder with the domain server name9 2. Inside each folder, we will have each virtual disk and the .xml file with the hypervisor configuration10Example crontab:11#Full VPN backup using libvirt.1200 09 * * * /usr/bin/python /opt/app/scripts/virBackup.py >> /dev/null13Depencencies:14apt-get install python-libvirt15'''16import libvirt17import syslog18from xml import dom19import subprocess20from os import path21from re import findall22from time import sleep23import os24import datetime as dt25import sys26LIB_VIRT_ERROR_EMAIL_LIST=['example@gexample.com']27NOTIFY_BACKUP_EMAIL_LIST=['example@gexample.com']28try:29 from operator import methodcaller30except ImportError:31 def methodcaller(name, *args, **kwargs):32 def caller(obj):33 return getattr(obj, name)(*args, **kwargs)34 return caller35class Domfetcher(object):36 """Abstract libvirt API, supply methods to return dom object lists"""37 def __init__(self, SASL_USER, SASL_PASS, uri = None):38 """Connect to hypervisor uri with read write access"""39 # register logit as the error handler40 41 libvirt.registerErrorHandler( logit, 'libvirt error' ) 42 43 try:44 #Without auht45 #self.c = libvirt.open( uri )46 47 #With auth (SASL2)48 self.auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], Domfetcher.request_cred, None]49 self.c = libvirt.openAuth(uri, self.auth, 0)50 51 except:52 self.c = None53 logit( "libvirt error", "Failed to open connection to the hypervisor" )54 55 @staticmethod 56 def request_cred(credentials, user_data):57 for credential in credentials:58 if credential[0] == libvirt.VIR_CRED_AUTHNAME:59 credential[4] = SASL_USER60 elif credential[0] == libvirt.VIR_CRED_PASSPHRASE:61 credential[4] = SASL_PASS62 return 063 64 def get_all_doms(self):65 """Return a list of running and shutoff dom objects"""66 doms = self.get_running_doms()67 for name in self.get_shutoff_doms():68 doms.append(name)69 return doms70 def get_running_doms(self):71 """Return a list of running dom objects"""72 doms = []73 for id in self.c.listDomainsID(): # loop over the running ids74 dom = self.c.lookupByID( id ) # fetch dom object by id75 if 'Domain-' not in dom.name(): # prevent actions on the Xen hypervisor76 doms.append( dom ) # append dom object to doms77 return doms78 79 def get_shutoff_doms(self):80 """Return a list of all shutoff but defined dom objects"""81 return [self.c.lookupByName( name ) for name in self.c.listDefinedDomains()]82 83 def get_backup_dom(self, dom_to_backup):84 """Accept a list of dom, and return Return a list of running dom objects"""85 """The return is order list by the priority number"""86 dom=None87 try:88 print "WE ADD TO THE LIST: " + dom_to_backup[0]89 dom = self.c.lookupByName( dom_to_backup[0] )90 except:91 logit( "libvirt error", "Failed to open a dom. Review the dom backup list" )92 93 94 return dom95 96 def get_disk_size(self, dom ):97 """Accept a dom objects, return int with the size disk in MB"""98 xml = dom.XMLDesc( 0 )99 total_size=0100 for disk_path in findall( "<source file='(.*)'/>\n", xml ):101 try:102 #Size in GB103 total_size=total_size + int(self.c.storageVolLookupByPath(disk_path).info()[2]/1024/1024)104 except:105 logit( "libvirt error", "Failed to open one disk when getting size, please add this disk into the POOL " + disk_path)106 107 return total_size108 109 '''110 conn=libvirt.open("qemu:///system")111 list_storage_pool=conn.listStoragePools()112 pool_obj = conn.storagePoolLookupByName('default')113 disk_obj=pool_obj.storageVolLookupByName('test1.img')114 int(pool_obj.storageVolLookupByName('test1.img').info()[2]/1024/1024)115 '''116 117def invoke( dom, method ):118 """Pattern to invoke shutdown, destroy, and start on a doms"""119 f = methodcaller( method )120 try:121 logit( method, 'invoking %s on %s' % (method, dom.name()) )122 retcode = f(dom)123 if retcode: # log retcode124 logit( 125 method,126 '{0} returned {1} on {2}'.format( method, retcode, dom.name() )127 ) 128 except libvirt.libvirtError:129 pass130 131 132def logit( context, message, quiet = False ):133 """syslog and error handler"""134 135 #Only if there is a error we send the email136 if context == "libvirt error":137 mail(message, context, LIB_VIRT_ERROR_EMAIL_LIST)138 139 if type( message ) is tuple:140 message = message[2] # libvirt message is a tuple141 if quiet: pass 142 else: print context + ': ' + message143 syslog.openlog( 'virt-back', 0, syslog.LOG_LOCAL3 )144 syslog.syslog( message )145 syslog.closelog()146 147 148 149def info( doms ):150 """Accept a list of dom objects, attempt to display info for all"""151 if check_all_running( doms ): print "NOTE: All guests are running"152 if check_all_shutoff( doms ): print "NOTE: All guests are shut off"153 154 155 156 print ''157 print 'running guests: ' + ', '.join( [ dom.name() for dom in get_all_running( doms ) ] )158 print 'shutoff guests: ' + ', '.join( [ dom.name() for dom in get_all_shutoff( doms ) ] )159 print ''160 print 'DomName'.ljust(16) + 'Memory MB'.rjust(12) + 'vCPUs'.rjust(8) + 'CPUtime ms'.rjust(18) + 'State'.rjust(20)161 print '=========================================================================='162 for dom in doms:163 name = dom.name()164 rams = str(dom.info()[2]/1024) + '/' + str(dom.info()[1]/1024)165 cpus = str(dom.info()[3])166 time = str(dom.info()[4]/1000000)167 state = get_status(dom)168 print name.ljust(16) + rams.rjust(12) + cpus.rjust(8) + time.rjust(18) + state.rjust(20)169def check_all_running( doms ):170 """Accept a list of dom objects, check if all guest dom are active"""171 if sum( [dom.isActive() for dom in doms] ) == len( doms ):172 return True173 return False174def check_all_shutoff( doms ):175 """Accept a list of dom objects, check if all guest dom are shut off"""176 if sum( [dom.isActive() for dom in doms] ):177 return False178 return True179 180def get_status( dom ):181 """Accept a dom object, return a string with the current domain status"""182 states = {183 libvirt.VIR_DOMAIN_NOSTATE: 'no state',184 libvirt.VIR_DOMAIN_RUNNING: 'running',185 libvirt.VIR_DOMAIN_BLOCKED: 'blocked on resource',186 libvirt.VIR_DOMAIN_PAUSED: 'paused',187 libvirt.VIR_DOMAIN_SHUTDOWN: 'being shut down',188 libvirt.VIR_DOMAIN_SHUTOFF: 'shut off',189 libvirt.VIR_DOMAIN_CRASHED: 'crashed',190 }191 192 return states.get(dom.info()[0])193def isPause( dom ):194 """Accept a dom objects, check if the guest dom is pause. True=Pause"""195 if dom.info()[0] != 3:196 return False197 return True198def isRunning( dom ):199 """Accept a dom objects, check if the guest dom is Running. True=Pause"""200 if dom.info()[0] != 1:201 return False202 return True203def get_all_running( doms ):204 """Accept a list of dom objects, return a list of running dom objects"""205 return [ dom for dom in doms if dom.isActive() ]206def get_all_shutoff( doms ):207 """Accept a list of dom objects, return a list of shutoff dom objects"""208 return [ dom for dom in doms if not dom.isActive() ]209def scp(source, server, path = ""):210 """Transfer a source file from the host to local"""211 try:212 subprocess.Popen(["scp", "%s:%s" % (server, source), path+'/']).wait()213 except Exception:214 logit( "libvirt error", "Failed coping the dom" )215 return False216 217 return True218def is_last_sun_of_month():219 today = dt.date.today()220 if get_day_backup() == today:221 return True222 223 return False224 225def get_day_backup(month=None):226 '''Get day backup for the current month. Accept a int month. Return a object date.'''227 '''Day backup is the last Sunday of the month'''228 today = dt.date.today()229 sunday=7230 if month != None:231 next_month = dt.date(today.year, month, 15) + dt.timedelta(days=31)232 else:233 next_month = dt.date(today.year, today.month, 15) + dt.timedelta(days=31)234 235 current_month = dt.date(next_month.year, next_month.month, 1) - dt.timedelta(days=1)236 237 #Sunday = 7... Monday = 1. If the last day of the month is Monday, one day less is Sunday. This is the last Sunday on the month238 if current_month.isoweekday() != sunday:239 days_to_rest=current_month.isoweekday()240 last_sunday_month=current_month - dt.timedelta(days=days_to_rest)241 242 else:243 last_sunday_month=current_month244 245 return last_sunday_month246 247 248def show_calendar_backup():249 '''Print the calendar backup for the current year'''250 today = dt.date.today()251 print "CALENDAR FOR "+ str(today.year)252 for month in 1,2,3,4,5,6,7,8,9,10,11,12:253 print get_day_backup(month)254 255def get_calendar_backup():256 '''return a lis calendar backup for the current year'''257 calendar=[]258 for month in 1,2,3,4,5,6,7,8,9,10,11,12:259 calendar.append(str(get_day_backup(month)))260 return calendar261def mail(txt, subject, to):262 '''Acepts String, String, List'''263 import smtplib264 sender = 'example@gexample.com'265 receivers = to266 267 message = """From: From Operations <example@example.com>268To: To Developers <example@gexample.com269Subject: %s270%s.271""" % (subject, txt)272 try:273 smtpObj = smtplib.SMTP('smtp.example.com')274 smtpObj.sendmail(sender, receivers, message) 275 except Exception:276 logit( "libvirt error", "Failed to send a email" )277 278def backup ( dom, backpath, host ):279 280 before_status=''281 #if dom is running, we paused to make the backup282 if isRunning(dom):283 invoke(dom, 'suspend')284 before_status="running"285 while True:286 if isPause(dom):287 print get_status(dom)288 break289 290 291 #Update the new backpath for each dom292 backpath_dom=backpath+'/'+dom.name()293 294 #we backup the XML file to have the configuration295 if not os.path.exists(backpath_dom):296 try:297 os.makedirs(backpath_dom)298 except:299 logit( "libvirt error", "Failed to create a folder. We skip backup for: " + dom.name() )300 301 302 xml = dom.XMLDesc( 0 )303 304 """305 How to parse the XML. WORKS306 from xml.dom.minidom import parse, parseString307 xml_parse = parseString(xml)308 dom_name=xml_parse.getElementsByTagName("name")[0].firstChild.data309 """310 311 xmlfile = path.join( backpath_dom, dom.name() + '.xml' )312 f = open( xmlfile, 'w')313 f.write( xml )314 f.close()315 disklist = findall( "<source file='(.*)'/>\n", xml )316 317 #We have to copy all the disk for the specific domain318 for disk in disklist:319 scp(disk, host, backpath_dom)320 print "BACKING UP!!!"321 print disk +' '+ host +' '+ backpath_dom322 323 sleep(2)324 #Only resume if the dom was running325 if before_status == 'running':326 invoke(dom, 'resume')327 328 329def is_disk_mounted ( DISK_UUID, backpath ):330 '''IN case the disk is not ready we exit the script'''331 if not os.path.ismount(backpath):332 status_code = subprocess.Popen(["mount", "-U", DISK_UUID, backpath]).wait()333 if status_code != 0:334 logit( "libvirt error", "Failed, DISK NOT MOUNTED AND NOT POSSIBLE TO MOUNT" )335 sys.exit(2)336 337if __name__ == '__main__': 338 339 SASL_USER = "user"340 SASL_PASS = "password"341 SPEED_NET = 25 #MB. Just to take time statistics342 backpath='/media/VPS_BACKUPs'343 344 # Used to check if the disk exist345 DISK_UUID="" # for example, blkid /dev/sda1346 347 #Fisrt thing we do is to check if the disk is ready:348 is_disk_mounted ( DISK_UUID, backpath )349 350 #Setup how many days before the backup the system will send a reminder by email351 #The thurday before we send a recordatory email. 3 days before352 REMINDER_DAYS_BACKUP = 3353 354 doms_to_backup = [355 # You can have tuples in this format:356 # [Name, Host, Order, services]357 ["database.example.com", "10.2.4.69", 1, "Mysql"],358 ["web.example.com", "10.2.4.69", 1, "project1, project2"],359 ["net.example.com", "10.2.4.69", 0, "DNS, DHCP"],360 ]361 362 363 current_connections={}364 #We backup on Sundays365 if is_last_sun_of_month():366 367 for dom_to_backup in sorted(doms_to_backup, key=lambda dom: dom[2]):368 host=dom_to_backup[1]369 print "Connecting to: " + host370 print "Backing up: " + dom_to_backup[0]371 372 if not current_connections.has_key(host):373 print "We create connecton to: " + host374 host_conn=Domfetcher(SASL_USER, SASL_PASS, 'qemu+tcp://'+host+'/system')375 current_connections.setdefault(host, host_conn)376 377 host_conn=current_connections.get(host)378 dom=host_conn.get_backup_dom(dom_to_backup)379 380 381 #We execute the action we want per each dom382 if dom != None:383 dom_size = host_conn.get_disk_size(dom)384 print "Size to backup: " + str(dom_size) + " MB"385 stimation_time = dom_size / SPEED_NET / 60386 print "Estiamtion time: " + str(stimation_time) + " minutes"387 #backup(dom, backpath, host)388 else:389 days_to_backup=get_day_backup().day-dt.date.today().day390 from datetime import timedelta391 if days_to_backup == REMINDER_DAYS_BACKUP:392 txt="This is auto email from BACKUP SYSTEM. This sunday all servers will be buckup.\393 Please find below more into and time estimation time: \n"394 now_hour=dt.datetime.now()395 count_hours=now_hour396 for dom_to_backup in sorted(doms_to_backup, key=lambda dom: dom[2]):397 host=dom_to_backup[1]398 if not current_connections.has_key(host):399 print "We create connecton to: " + host400 host_conn=Domfetcher(SASL_USER, SASL_PASS, 'qemu+tcp://'+host+'/system')401 current_connections.setdefault(host, host_conn)402 403 host_conn=current_connections.get(host)404 dom=host_conn.get_backup_dom(dom_to_backup)405 406 if dom != None:407 dom_time = host_conn.get_disk_size(dom) / SPEED_NET / 60 #in minutes408 count_hours=count_hours+timedelta(minutes=dom_time) + timedelta(minutes = 5) #We add 5 extra minutes409 #dom_time + 0.05 #5 extra minutes 410 txt+="\n"411 txt+="Server: " + dom_to_backup[0] + "\n"\412 "Estimation time for the down time: FROM " + now_hour.strftime("%H:%M") +' TO '+ count_hours.strftime("%H:%M") + " hours." + "\n"\413 "Services will be affected: "+ dom_to_backup[3] 414 txt+="\n"415 now_hour=count_hours416 417 subject="PERFORME VPS BACKUPS"418 txt+="\n\n BACKUP CALENDAR FOR " +str(dt.datetime.today().year)+ ":\n"419 txt+="\n".join( str(x) for x in get_calendar_backup())420 421 mail(txt, subject, NOTIFY_BACKUP_EMAIL_LIST)...

Full Screen

Full Screen

tiobench.py

Source:tiobench.py Github

copy

Full Screen

...226 cmd = "umount %s" % l_disk227 cmd1 = 'umount /dev/mapper/avocado_vg-avocado_lv'228 process.system(cmd, shell=True, ignore_status=True)229 process.system(cmd1, shell=True, ignore_status=True)230 if disk.is_disk_mounted(l_disk):231 return False232 return True233 def is_dir_unmounted():234 cmd = 'umount %s' % self.dir235 process.system(cmd, shell=True, ignore_status=True)236 if disk.is_dir_mounted(self.dir):237 return False238 return True239 self.log.info("checking if disk is mounted.")240 if disk.is_disk_mounted(l_disk):241 self.log.info("%s is mounted, unmounting it ....", l_disk)242 if wait.wait_for(is_disk_unmounted, timeout=10):243 self.log.info("%s unmounted successfully" % l_disk)244 else:245 self.err_mesg.append("%s unmount failed", l_disk)246 else:247 self.log.info("disk %s not mounted." % l_disk)248 self.log.info("checking if dir %s is mounted." % self.dir)249 if disk.is_dir_mounted(self.dir):250 self.log.info("%s is mounted, unmounting it ....", self.dir)251 if wait.wait_for(is_dir_unmounted, timeout=10):252 self.log.info("%s unmounted successfully" % self.dir)253 else:254 self.err_mesg.append("failed to unount %s", self.dir)...

Full Screen

Full Screen

add_share.py

Source:add_share.py Github

copy

Full Screen

...38 print "<script>location.href = 'iframe_nas_settings.py';</script>";39 else:40 sharepathsplit = share_path.split('/');41 disk = sharepathsplit[0];42 checkmount = cli_utils.is_disk_mounted(disk);43 if (checkmount['id'] > 0):44 print "<script>alert('The disk [%s] is not mounted. Please run the [Rescan Volumes], [Remount Volumes] from [TOOLS] option. Could not create share!');</script>" % disk;45 print "<script>parent.location.href = 'main.py?page=nas';</script>";46 else:47 shares_dic = {'name': '', 'path': '', 'comment': ''};48 shares_dic['name'] = share_name;49 shares_dic['path'] = share_path;50 shares_dic['comment'] = comment;51 check_dir_exist = tools.is_dir_exist("/storage/"+shares_dic['path'])52 if(check_dir_exist == True):53 shares_dic['create_dir'] = False54 else:55 shares_dic['create_dir'] = True56 ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful