Best Python code snippet using autotest_python
main.py
Source:main.py  
1"""Functions that invoke repozo and/or the blob backup.2"""3import logging4import sys5from collective.recipe.backup import copyblobs6from collective.recipe.backup import repozorunner7from collective.recipe.backup import utils8logger = logging.getLogger('backup')9def backup_main(bin_dir, storages, keep, full,10                verbose, gzip, backup_blobs, only_blobs, use_rsync,11                keep_blob_days=0, pre_command='', post_command='', **kwargs):12    """Main method, gets called by generated bin/backup."""13    utils.execute_or_fail(pre_command)14    if not only_blobs:15        result = repozorunner.backup_main(16            bin_dir, storages, keep, full, verbose, gzip)17        if result and backup_blobs:18            logger.error("Halting execution due to error; not backing up "19                         "blobs.")20    if not backup_blobs:21        utils.execute_or_fail(post_command)22        return23    for storage in storages:24        blobdir = storage['blobdir']25        if not blobdir:26            logger.info("No blob dir defined for %s storage" %27                        storage['storage'])28            continue29        blob_backup_location = storage['blob_backup_location']30        logger.info("Please wait while backing up blobs from %s to %s",31                    blobdir, blob_backup_location)32        copyblobs.backup_blobs(blobdir, blob_backup_location, full,33                               use_rsync, keep=keep, keep_blob_days=keep_blob_days,)34    utils.execute_or_fail(post_command)35def fullbackup_main(bin_dir, storages, keep, full,36                verbose, gzip, backup_blobs, only_blobs, use_rsync,37                keep_blob_days=0, pre_command='', post_command='', **kwargs):38    """Main method, gets called by generated bin/fullbackup."""39    utils.execute_or_fail(pre_command)40    if not only_blobs:41        # Set Full=True for forced full backups.42        # It was easier to do this here, than mess with43        # "script_arguments = arguments_template % opts"44        # in backup.Recipe.install45        full = True46        result = repozorunner.fullbackup_main(47            bin_dir, storages, keep, full, verbose, gzip)48        if result and backup_blobs:49            logger.error("Halting execution due to error; not backing up "50                         "blobs.")51    if not backup_blobs:52        utils.execute_or_fail(post_command)53        return54    for storage in storages:55        blobdir = storage['blobdir']56        if not blobdir:57            logger.info("No blob dir defined for %s storage" %58                        storage['storage'])59            continue60        blob_backup_location = storage['blob_backup_location']61        logger.info("Please wait while backing up blobs from %s to %s",62                    blobdir, blob_backup_location)63        copyblobs.backup_blobs(blobdir, blob_backup_location, full,64                               use_rsync, keep=keep, keep_blob_days=keep_blob_days,)65    utils.execute_or_fail(post_command)66def snapshot_main(bin_dir, storages, keep, verbose, gzip,67                  backup_blobs, only_blobs, use_rsync, keep_blob_days=0,68                  pre_command='', post_command='', **kwargs):69    """Main method, gets called by generated bin/snapshotbackup."""70    utils.execute_or_fail(pre_command)71    if not only_blobs:72        result = repozorunner.snapshot_main(73            bin_dir, storages, keep, verbose, gzip)74        if result and backup_blobs:75            logger.error("Halting execution due to error; not backing up "76                         "blobs.")77    if not backup_blobs:78        utils.execute_or_fail(post_command)79        return80    for storage in storages:81        blobdir = storage['blobdir']82        if not blobdir:83            logger.info("No blob dir defined for %s storage" %84                        storage['storage'])85            continue86        blob_snapshot_location = storage['blob_snapshot_location']87        logger.info("Please wait while making snapshot of blobs from %s to %s",88                    blobdir, blob_snapshot_location)89        copyblobs.backup_blobs(blobdir, blob_snapshot_location,90                           full=True, use_rsync=use_rsync, keep=keep,91                           keep_blob_days=keep_blob_days)92    utils.execute_or_fail(post_command)93def restore_main(bin_dir, storages, verbose, backup_blobs,94                 only_blobs, use_rsync, restore_snapshot=False,95                 pre_command='', post_command='',96                 **kwargs):97    """Main method, gets called by generated bin/restore."""98    date = None99    # Try to find a date in the command line arguments100    for arg in sys.argv:101        if arg in ('-q', '-n', '--quiet', '--no-prompt'):102            continue103        if arg.find('restore') != -1:104            continue105        # We can assume this argument is a date106        date = arg107        logger.debug("Argument passed to bin/restore, we assume it is "108                     "a date that we have to pass to repozo: %s.", date)109        logger.info("Date restriction: restoring state at %s." % date)110        break111    question = '\n'112    if not only_blobs:113        question += "This will replace the filestorage:\n"114        for storage in storages:115            question += "    %s\n" % storage.get('datafs')116    if backup_blobs:117        question += "This will replace the blobstorage:\n"118        for storage in storages:119            question += "    %s\n" % storage.get('blobdir')120    question += "Are you sure?"121    if not kwargs.get('no_prompt'):122        if not utils.ask(question, default=False, exact=True):123            logger.info("Not restoring.")124            sys.exit(0)125    utils.execute_or_fail(pre_command)126    if not only_blobs:127        result = repozorunner.restore_main(128            bin_dir, storages, verbose, date,129            restore_snapshot)130        if result and backup_blobs:131            logger.error("Halting execution due to error; not restoring "132                         "blobs.")133            sys.exit(1)134    if not backup_blobs:135        utils.execute_or_fail(post_command)136        return137    for storage in storages:138        blobdir = storage['blobdir']139        if restore_snapshot:140            blob_backup_location = storage['blob_snapshot_location']141        else:142            blob_backup_location = storage['blob_backup_location']143        if not blobdir:144            logger.info("No blob dir defined for %s storage" %145                        storage['storage'])146            continue147        if not blobdir:148            logger.error("No blob storage source specified")149            sys.exit(1)150        logger.info("Restoring blobs from %s to %s", blob_backup_location,151                    blobdir)152        copyblobs.restore_blobs(blob_backup_location, blobdir,153                                use_rsync=use_rsync, date=date)154    utils.execute_or_fail(post_command)155def snapshot_restore_main(*args, **kwargs):156    """Main method, gets called by generated bin/snapshotrestore.157    Difference with restore_main is that we get need to use the158    snapshot_location and blob_snapshot_location.159    """160    # Override the locations:161    kwargs['restore_snapshot'] = True...synckeys.py
Source:synckeys.py  
1# Copyright 2011-2012 Eucalyptus Systems, Inc.2#3# Redistribution and use of this software in source and binary forms,4# with or without modification, are permitted provided that the following5# conditions are met:6#7#   Redistributions of source code must retain the above copyright notice,8#   this list of conditions and the following disclaimer.9#10#   Redistributions in binary form must reproduce the above copyright11#   notice, this list of conditions and the following disclaimer in the12#   documentation and/or other materials provided with the distribution.13#14# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS15# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT16# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR17# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT18# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,19# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT20# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,21# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY22# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT23# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE24# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.25import os.path26import pwd27import shutil28import socket29import subprocess30import sys31class SyncKeys(object):32    def __init__(self, src_files, dst_dir, remote_host,33                 use_rsync=True, use_scp=True, remote_user='root'):34        self.src_files   = src_files35        self.dst_dir     = dst_dir36        self.remote_host = remote_host37        self.remote_user = remote_user38        self.use_rsync   = use_rsync39        self.use_scp     = use_scp40    def warn(self, msg):41        print >> sys.stderr, 'warning:', msg42    def error(self, msg):43        print >> sys.stderr, 'error:', msg44    def get_extant_src_files(self):45        found = []46        for src_file in self.src_files:47            if os.path.isfile(src_file):48                found.append(src_file)49            else:50                self.warn('unable to sync file %s because it does not exist; '51                          'services may have trouble communicating' % src_file)52        return found53    def can_use_local_sync(self):54        if self.remote_host in ('127.0.0.1', 'localhost',55                                socket.gethostname()):56            return True57        return False58    def sync_local(self, src_files):59        for src_file in src_files:60            try:61                shutil.copy2(src_file, self.dst_dir)62            except Exception as exc:63                self.error('failed to copy %s to %s: %s' %64                           (src_file, self.dst_dir, str(exc)))65                return False66        return True67    def sync_with_rsync(self, src_files):68        cmd = ['rsync', '-az'] + src_files69        cmd.append('%s@%s:%s' % (self.remote_user, self.remote_host,70                                 self.dst_dir))71        # Check if we need to elevate privileges72        if any(not os.access(src_file, os.R_OK) for src_file in src_files):73            print 'elevating privileges with sudo'74            cmd = ['sudo', '-u', self.get_euca_user()] + cmd75        try:76            subprocess.check_call(cmd)77            return True78        except subprocess.CalledProcessError as err:79            self.error('key sync using rsync failed: %s' % str(err))80            return False81    def get_euca_user(self):82        euca_user = os.environ.get('EUCA_USER', None)83        if not euca_user:84            try:85                pwd.getpwnam('eucalyptus')86                euca_user = 'eucalyptus'87            except KeyError:88                self.error('EUCA_USER is not defined')89                sys.exit(1)90        return euca_user91    def sync_with_scp(self, src_files):92        cmd = ['scp'] + src_files93        cmd.append('%s@%s:%s' % (self.remote_user, self.remote_host,94                                 self.dst_dir))95        # Check if we need to elevate privileges96        if any(not os.access(src_file, os.R_OK) for src_file in src_files):97            print 'elevating privileges with sudo'98            cmd = ['sudo', '-u', self.get_euca_user()] + cmd99        try:100            subprocess.check_call(cmd)101            return True102        except subprocess.CalledProcessError as err:103            self.error('key sync using scp failed: %s' % str(err))104            return False105    def sync_keys(self):106        src_files = self.get_extant_src_files()107        success = False108        if self.can_use_local_sync():109            success = self.sync_local(src_files)110        if not success and self.use_rsync:111            success = self.sync_with_rsync(src_files)112        if not success and self.use_scp:113            success = self.sync_with_scp(src_files)...copy_all.py
Source:copy_all.py  
1#!/usr/bin/python2import sys3import os4import glob5import fnmatch6import subprocess7from timeit import default_timer as timer8basedir = '/scratch/wir/praetori/projects/polar_pfc/results'9setup = 'polar_pfc'10out_dir = '/media/Home/projects/shtns/results/polar_pfc'11use_rsync = False12if use_rsync:13  command = "rsync --prune-empty-dirs --include='*.csv' --include='*/' --exclude='*' -r --info=progress2  -e ssh "14else:15  command = "scp "16dt = 1.017for run in range(0,5):18  for radius in [10,20,40,60,80,100]:19    for V0 in [0.1, 0.2, 0.3, 0.31, 0.32, 0.33, 0.34, 0.35, 0.36, 0.37, 0.38, 0.39, 0.4, 0.425, 0.45, 0.475, 0.5,0.6,0.7,0.8,0.9,1.0]:20      dirname = basedir + '/' + setup + '/radius_' + str(radius) + '_v0_' + str(V0) + '_run_' + str(run)21      print 'copy',dirname22      if use_rsync:23        subprocess.call(command + "praetori@taurusexport.hrsk.tu-dresden.de:" + dirname + " " + out_dir, shell=True)24      else:25        if not os.path.exists(out_dir):26          os.makedirs(out_dir)27        start = timer()28        subprocess.call(command + "praetori@taurusexport.hrsk.tu-dresden.de:" + dirname + "/data.tar.gz " + out_dir, shell=True)29        subprocess.call("tar --strip-components=7 -xzf " + out_dir + "/data.tar.gz -C " + out_dir, shell=True)30        end = timer()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
