Best Python code snippet using autotest_python
mysql.py
Source:mysql.py  
1import configparser2import datetime3import os4import pyAesCrypt5import re6import resource7import shutil8import subprocess9import sys10import tarfile11import tempfile12import time13class DeletionMeta():14    def __init__(self):15        self._candidates = {}16    def insert(self, candidate):17        candidate.inject(self._candidates)18        return self19    def candidateForBackup(self, backup):20        return DeletionCandidate.byBackup(self._candidates, backup)21class DeletionCandidate():22    def __init__(self, meta):23        self._expirationTime = None24        self._meta = meta25    def remove(self, resort):26        self._backup.remove(resort)27        return self28    @classmethod29    def byBackup(cls, dict, backup):30        return dict[backup.getName()]31    def inject(self, dict):32        dict[self._backup.getName()] = self33        return self34    def getName(self):35        return self._backup.getName()36    def parse(self, backup):37        self._backup = backup38        self._meta.insert(self)39        return self40    def rule(self, rule):41        tagName, validTime = rule.split(':')42        if not self._backup.hasTag(tagName):43            return self44        unit, amount = self.__parseRuleIntoUnitAmonut(validTime)45        interval = self.__parseExpirationInterval(unit, amount)46        newExpirationTime = self._backup.getCreationDate() + interval47        if self.isLater(newExpirationTime):48            self._expirationTime = newExpirationTime49        return self50    def __parseRuleIntoUnitAmonut(self, validTime):51        result = re.match('([0-9]+)(.*)', validTime)52        if not result:53            raise ValueError(validTime+' could not be parsed as valid time')54        amount = int(result.group(1))55        unit = result.group(2)56        return unit, amount57    def __parseExpirationInterval(self, unit, amount):58        units = {59                'day': self.daysDelta,60                'days': self.daysDelta,61                'week': self.weeksDelta,62                'weeks': self.weeksDelta,63                }64        try:65            interval = units[unit](amount)66        except KeyError:67            validUnits = ','.join( units.keys() )68            raise KeyError(unit+' is not a valid unit. Accepted values: '+validUnits)69        return interval70    def isLater(self, newExpirationTime):71        if not self._expirationTime:72            return True73        isLater = newExpirationTime > self._expirationTime74        if isLater:75            return True76        return False77    def daysDelta(self, amount):78        return datetime.timedelta(days=amount)79    def weeksDelta(self, amount):80        return datetime.timedelta(weeks=amount)81    def orphaned(self):82        if self._backup.isFull():83            return False84        parents = self._backup.getParents()85        for parent in parents:86            parentCandidate = self._meta.candidateForBackup(parent)87            if not parentCandidate.shouldDelete():88                return False89        return True90    def expired(self):91        if not self._expirationTime:92            return False93        return self._expirationTime < datetime.datetime.now()94    def shouldDelete(self):95        if self.expired():96            return True97        if self.orphaned():98            return True99        return False100    def printExpiration(self):101        self._backup.print(suffix=' expires at '+str(self._expirationTime) )102    def print(self):103        if not self.shouldDelete():104            return105        if self.expired():106            self._backup.print(suffix=' has expired at '+str(self._expirationTime))107        if self.orphaned():108            self._backup.print(suffix=' will be orphaned after the pruning')109    def getCreationDate(self):110        return self._backup.getCreationDate()111class Finder():112    def backups(self, backups):113        self.backups = backups114        return self115    def parameters(self, parameters):116        self._parameters = parameters117        return self118class Sorter():119    def sort(self, backups):120        return sorted(backups, key=lambda backup: backup.getCreationDate())121class LatestTagFinder(Finder):122    def __init__(self, sorter=Sorter()):123        self._sorter = sorter124        self._parameters = []125    def find(self, backups):126        if len(self._parameters) == 0:127            raise KeyError("Missing tag to search")128        requiredTag = self._parameters[0]129        backupsWithTags = []130        for backup in backups:131            if backup.hasTag(requiredTag):132                backupsWithTags.append(backup)133        sortedBackups = self._sorter.sort(backupsWithTags)134        return sortedBackups[-1]135class LatestFinder(Finder):136    def __init__(self, sorter=Sorter()):137        self._sorter = sorter138    def find(self, backups):139        sortedBackups = self._sorter.sort(backups)140        return sortedBackups[-1]141class LatestFullFinder(Finder):142    def __init__(self, sorter=Sorter()):143        self._sorter = sorter144    def find(self, backups):145        fullBackups = []146        for backup in backups:147            if not backup.isFull():148                continue149            fullBackups.append(backup)150        sortedFullBackups = self._sorter.sort(fullBackups)151        return sortedFullBackups[-1]152class BackupNotFoundException(Exception):153    pass154class MySQLBackupMeta:155    def __init__(self):156        self.byEndingPoint = {}157        self.byStartingPoint = {}158    def addByEndingPoint(self, endingPoint,backup):159        try:160            self.byEndingPoint[endingPoint].append(backup)161        except KeyError:162            self.byEndingPoint[endingPoint] = [ backup ]163        return self164    def addByStartingPoint(self, startingPoint,backup):165        try:166            self.byStartingPoint[startingPoint].append(backup)167        except KeyError:168            self.byStartingPoint[startingPoint] = [ backup ]169        return self170    def parents(self, childStartingPoint):171        try:172            return self.byEndingPoint[childStartingPoint]173        except KeyError:174            return None175    def children(self, parentEndingPoint):176        try:177            return self.byStartingPoint[parentEndingPoint]178        except KeyError:179            return []180class MySQLBackup:181    def __init__(self, sorter=Sorter()):182        self._name = ''183        self._sorter = sorter184        self._tags = []185    def remove(self, resort):186        resort.remove(self._name)187        return self188    def getName(self):189        return self._name190    def addTag(self, tag):191        if tag in self._tags:192            return self193        self._tags.append(tag)194        return self195    def mysql(self, mysql):196        self._mysql = mysql197        return self198    def name(self, name):199        self._name = name200        return self201    def meta(self, meta):202        self._meta = meta203        return self204    def hasTag(self, tag):205        return tag in self._tags206    def writeIni(self, filePath):207        config = configparser.ConfigParser()208        config['main'] = {}209        config['main']['tags'] = ','.join(self._tags)210        with open(filePath, 'w') as file:211            config.write(file)212    def parseIni(self, extraIni):213        config = configparser.ConfigParser()214        config.read_string(extraIni)215        self._tags = config['main']['tags'].split(',')216        return self217    def parseInfo(self, infoContent):218        dummyHeader = '[main]\n'219        iniFile = dummyHeader+infoContent220        xtrabackupInfoContent = configparser.ConfigParser()221        xtrabackupInfoContent.read_string(iniFile)222        isFullBackup = False223        startingPoint = int(xtrabackupInfoContent['main']['innodb_from_lsn'])224        if startingPoint == 0:225            isFullBackup = True226        self._full = isFullBackup227        self._startingPoint = startingPoint228        endingPoint = int(xtrabackupInfoContent['main']['innodb_to_lsn'])229        self._endingPoint = endingPoint230        self._meta.addByEndingPoint(endingPoint, self)231        self._meta.addByStartingPoint(startingPoint, self)232        return self233    def isNamed(self, name):234        return self._name == name235    def isFull(self):236        return self._full237    def print(self, indent = 0, prefix='', suffix=''):238        tags = self._tags239        tagList = ','.join(self._tags)240        tagInfo = ' tags:'+tagList241        print( (' ' * indent) + prefix + self._name + tagInfo + suffix)242    def printRecursive(self, indent = 0):243        self.print(indent)244        sortedChildren = self._sorter.sort( self.getChildren() )245        for child in sortedChildren:246            if child is self:247                continue248            if not child.isClosestRelative(self):249                continue250            child.printRecursive(indent + 2)251    def isBefore(self, other):252        return self.getCreationDate() < other.getCreationDate()253    def getClosestRelative(self):254        children = self.getChildren()255        sortedChildren = self._sorter.sort(children)256        return sortedChildren[0]257    def isClosestRelative(self, other):258        try:259            closestRelative = other.getClosestRelative()260            return closestRelative == self261        except IndexError:262            return False263    def getCreationDate(self):264        return datetime.datetime.strptime(self._name, '%Y-%m-%d_%H-%M')265    def getHistory(self):266        history = [self]267        parent = self.getParent()268        while parent:269            history.append(parent)270            parent = parent.getParent()271        history.reverse()272        fullBackup = history.pop(0)273        return [ fullBackup, history ]274    def isParent(self, backup):275        if backup in self.getParents():276            return True277        for parent in self.getParents():278            if parent.isParent(backup):279                return True280        return False281    def getParent(self):282        if self._full:283            return None284        parents = self._meta.parents(self._startingPoint)285        return self.__skipEmptyParents(parents)[0]286    def getParents(self):287        if self._full:288            return []289        parents = self._meta.parents(self._startingPoint)290        parents = self.__cantBeOwnParent(parents)291        return self.__parentsMustBeBefore(parents)292    def __cantBeOwnParent(self, parents):293        return list( filter(lambda backup: backup._name != self._name, parents) )294    def __skipEmptyParents(self, parents):295        return list( filter(lambda backup: not backup.isEmpty(), parents) )296    def __parentsMustBeBefore(self, parents):297        return list( filter(lambda backup: backup.isBefore(self), parents) )298    def isEmpty(self):299        return self._startingPoint == self._endingPoint300    def getChildren(self):301        children = self._meta.children(self._endingPoint)302        withoutSelf =  self.__cantBeOwnChild(children)303        return self.__childCantBeBefore(withoutSelf)304    def __cantBeOwnChild(self, children):305        return list( filter(lambda backup: backup._name != self._name, children) )306    def __childCantBeBefore(self, children):307        return list( filter(lambda backup: self.isBefore(backup), children ) )308    def inject(self, backupList):309        backupList.append(self)310        return self311    def setTimestamp(self, gauge):312        timestamp = self.getCreationDate().timestamp()313        gauge.set(timestamp)314        return self315class MySQL:316    def __init__(self, sorter=Sorter()):317        self._mysqlHost = os.environ.get('MYSQL_HOST', 'mysql')318        self._mysqlPort = os.environ.get('MYSQL_PORT', '3306')319        self._mysqlUsername = os.environ.get('MYSQL_USERNAME', 'backup')320        self._mysqlPassword = os.environ.get('MYSQL_PASSWORD')321        self._mysqlDatadir = os.environ.get('MYSQL_DATADIR', False)322        self._backupCommand = os.environ.get('MYSQL_COMMAND', 'mariabackup')323        self._bufferSize = int(os.environ.get('MYSQL_ENC_BUFSIZE', 64 * 1024))324        self._password = os.environ.get('MYSQL_ENC_PASSWORD')325        self._assetBase = os.path.dirname(os.path.realpath(__file__))+'/assets'326        self._tags = []327        self._sorter = sorter328        self._dryRun = False329        self._specialNames = {330                'latest-full-backup': LatestFullFinder(),331                'latest-backup': LatestFinder(),332                'latest-tag': LatestTagFinder(),333                }334    def setOutput(self, output):335        self.output = output336    def dataDir(self, dataDir):337        self._mysqlDatadir = dataDir338        return self339    def dryRun(self, dryRun):340        self._dryRun = dryRun341        return self342    def resort(self, resort):343        self._resort = resort344        return self345    def pruneBackups(self, rules):346        candidates = self.__buildDeletionCanidates(rules)347        sortedCandidates = self._sorter.sort(candidates)348        for candidate in sortedCandidates:349            candidate.print()350        if self._dryRun:351            print("Dry run - not executing")352            return353        for candidate in candidates:354            if not candidate.shouldDelete():355                continue356            candidate.remove( self._resort.adapter('mysql') )357    def __buildDeletionCanidates(self, rules):358        candidates = []359        meta = DeletionMeta()360        backups = self.list()361        for backup in backups:362            candidate = DeletionCandidate(meta).parse(backup)363            for rule in rules:364                candidate.rule(rule)365            candidates.append(candidate)366        return candidates367    def incrementalBackup(self, name, parentName):368        parent = self.find(parentName)369        print('Basing backup on:')370        parent.print()371        return self.__backup(name, parent._endingPoint)372    def fullBackup(self, name):373        return self.__backup(name)374    def tags(self, tags):375        self._tags = tags376        return self377    def __backup(self, name, baseLsn = None):378        fileLimit = self.__getFileLimit()379        with tempfile.TemporaryDirectory() as tempDirectory:380            backupDirectory = tempDirectory+'/backup'381            os.mkdir(backupDirectory)382            command = [383                'mariabackup',384                '--backup',385                '--target-dir='+backupDirectory,386                '--host='+self._mysqlHost,387                '--user='+self._mysqlUsername,388                '--password='+self._mysqlPassword,389                '--port='+self._mysqlPort,390                ]391            if self._mysqlDatadir:392                command.append('--datadir='+self._mysqlDatadir)393            if baseLsn:394                command.append('--incremental-lsn='+str(baseLsn))395            completedProcess = subprocess.run(command, check=True)396            self.__extractBackupInfo(backupDirectory, tempDirectory)397            tarFile = self.__compressBackup(backupDirectory, tempDirectory)398            self.__encryptBackup(tarFile)399            newBackup = MySQLBackup()400            for tag in self._tags:401                newBackup.addTag(tag)402            newBackup.writeIni(tempDirectory+'/cloudbackup.ini')403            self.__uploadBackup(tempDirectory, name)404        return self405    def __extractBackupInfo(self, backupDirectory, tempDirectory):406            shutil.copy(backupDirectory+'/xtrabackup_info', tempDirectory+'/xtrabackup_info')407    def __compressBackup(self, backupDirectory, tempDirectory):408        tarFilePath = tempDirectory+'/backup.tar.bz2'409        tar = tarfile.open(tarFilePath, 'w:bz2')410        tar.add(backupDirectory, 'backup')411        tar.close()412        shutil.rmtree(backupDirectory)413        return tarFilePath414    def __encryptBackup(self, tarFilePath):415        pyAesCrypt.encryptFile(tarFilePath, tarFilePath+'.aes', self._password, self._bufferSize)416        os.remove(tarFilePath)417    def __uploadBackup(self, backupDirectory, name):418        self._resort.adapter('mysql').upload(backupDirectory, name)419    def getSpecialBackups(self):420        availableBackups = self.list()421        specialBackups = {}422        for specialName in self._specialNames:423            specialNameFinder = self._specialNames[specialName]424            try:425                specialBackups[specialName] = specialNameFinder.find(availableBackups)426            except IndexError:427                pass428            except KeyError:429                pass430        return specialBackups431    def find(self, name):432        availableBackups = self.list()433        specialParameters = name.split(':')434        specialName = specialParameters.pop(0)435        if specialName in self._specialNames.keys():436            specialNameFinder = self._specialNames[specialName]437            return specialNameFinder.parameters(specialParameters).find(availableBackups)438        for backup in availableBackups:439            if backup.isNamed(name):440                return backup441        raise BackupNotFoundException()442    def restore(self, name, dataDir, port=8080):443        dataDirAbsolute = os.path.abspath(dataDir)444        backup = self.find(name)445        fullBackup, history = backup.getHistory()446        self.output.info("Full Backup: "+fullBackup.getName())447        self.output.info("Backup history:")448        for backup in history:449            self.output.info("- "+backup.getName() )450        self.output.info("Restoring Full Backup")451        self.__restoreFullBackup(fullBackup, dataDirAbsolute)452        self.output.info("Restoring Incremental Backups")453        self.__restoreIncrementalBackups(history, dataDirAbsolute)454        self.__copyDockerCompose(dataDirAbsolute+'/backup/docker-compose.yml', port)455    def __restoreFullBackup(self, backup, dataDir):456        self.__downloadAndExtract(backup, dataDir)457        completedProcess = subprocess.run([458            'mariabackup',459            '--prepare',460            '--target-dir='+dataDir+'/backup',461            ], check=True)462    def __restoreIncrementalBackups(self, backups, dataDir):463        for backup in backups:464            self.__restoreIncrementalBackup(backup, dataDir)465    def __restoreIncrementalBackup(self, backup, dataDir):466        with tempfile.TemporaryDirectory(dir=dataDir) as tempDir:467            self.output.info("Downloading "+backup.getName())468            self.__downloadAndExtract(backup, tempDir)469            completedProcess = subprocess.run([470                'mariabackup',471                '--prepare',472                '--target-dir='+dataDir+'/backup',473                '--incremental-dir='+tempDir+'/backup'474                ], check=True)475    def __downloadAndExtract(self, backup, targetDirectory):476        self._resort.adapter('mysql').download(backup._name, targetDirectory)477        tarFilePath = targetDirectory+'/backup.tar.bz2'478        self.__decryptBackup(tarFilePath)479        self.__unpackBackup(tarFilePath, targetDirectory)480    def __decryptBackup(self, tarFilePath):481        encryptedPath = tarFilePath+'.aes'482        pyAesCrypt.decryptFile(encryptedPath, tarFilePath, self._password, self._bufferSize)483        os.remove(encryptedPath)484    def __unpackBackup(self, tarFilePath, dataDir):485        tar = tarfile.open(tarFilePath, 'r:bz2')486        tar.extractall(dataDir)487        os.remove(tarFilePath)488    def __copyDockerCompose(self, target, port):489        with open(self._assetBase+'/docker-compose.yml', 'r') as file:490            dockerCompose = file.read()491        dockerCompose = dockerCompose.replace('%%PORT%%', str(port))492        with open(target, 'w') as file:493            file.write(dockerCompose)494    def __getFileLimit(self):495        limits = resource.getrlimit(resource.RLIMIT_NOFILE)496        return limits[1]497    def list(self):498        meta = MySQLBackupMeta()499        backups = []500        for directory in self._resort.adapter('mysql').listFolders():501            backup = self.__parseBackup(directory, meta)502            backup.inject(backups)503        sortedBackups = self._sorter.sort(backups)504        return sortedBackups505    def __parseBackup(self, directory, meta):506        backupName = os.path.basename(directory)507        infoContent = self._resort.adapter('mysql').fileContent(directory+'/xtrabackup_info').decode('utf-8')508        cloudbackupIni = self._resort.adapter('mysql').fileContent(directory+'/cloudbackup.ini').decode('utf-8')509        return MySQLBackup() \510                .name(backupName).meta(meta) \511                .parseInfo(infoContent) \512                .parseIni(cloudbackupIni)513    def scrape(self, gauge):514        availableBackups = self.list()515        self.__scrapeLatestBackup(gauge, availableBackups)516        self.__scrapeLatestFullBackup(gauge, availableBackups)517        return self518    def __scrapeLatestBackup(self, gauge, backups):519        try:520            backup = self._specialNames['latest-backup'].find(backups)521            backup.setTimestamp( gauge.labels(self._resort._name, 'Either') )522        except IndexError:523            gauge.labels(self._resort._name, 'Either').set(0)524    def __scrapeLatestFullBackup(self, gauge, backups):525        try:526            fullBackup = self._specialNames['latest-full-backup'].find(backups)527            fullBackup.setTimestamp( gauge.labels(self._resort._name, 'True') )528        except IndexError:...test_board.py
Source:test_board.py  
1#!/usr/bin/python32import unittest3import board4#   Deck fixtures5unshuffled = [*range(0,52)]6reversed = [*range(0,52)]7reversed.reverse()8no_aces = board.parseDeck("""99H KD 8H JD AC 9S 7C 6D10JH 3H 7S TC AH 2C 6C 4H11KH AD TH 2D 3S QC JS 4S125D TD 8C TS 6H JC 8S 7D13QS QD 4D 3C AS 5H 7H QH145C KC 8D 9C 2H 3D KS 6S159D 5S 2S 4C16""")17two_aces = board.parseDeck("""18KH 2H 4D QC 3S 7D 7S KD19JD 4H 3C 7C 9H KS 7H 5D206S AC QS TD JC 9C JH KC219S 4S QH 2D 4C AD TS 8H22TH 2S JS 6C 9D 3D 8D TC232C 5H 8C 5S 3H 6H 8S 6D24AH 5C QD AS25""")26two_aces_two = board.parseDeck("""276H AH 3H 2D JC 4D TH QD28TS 7D 9D 2C 7S QS JD 5D298H 4S 6S 5H 6C KD KC JH30QC TD 8S 7C 4C KS 9H 3S312S 2H 9S TC 8D 5C AD 4H32AS 5S 8C QH JS 6D KH 7H33AC 3D 9C 3C34""")35class CardUnitTest(unittest.TestCase):36    def test_makeCard(self):37        for card in range(0,52):38            self.assertEqual(card, board.makeCard(card // 13, card % 13))39    def test_suit(self):40        for card in range(0,52):41            self.assertEqual(card // 13, board.suit(card), )42    def test_pips(self):43        for card in range(0,52):44            self.assertEqual(card % 13, board.pips(card))45    def assert_formatCard(self, formatted, suit, pips):46        card = board.makeCard(suit, pips)47        actual = board.formatCard(card)48        self.assertEqual(formatted, actual, f"Card format failed %{pips}; {suit}")49        actual = board.parseCard(formatted)50        self.assertEqual(card, actual, f"Card parse %{pips}; {suit}")51    def test_formatCard(self):52        self.assert_formatCard('AC', 0, 0)53        self.assert_formatCard('2C', 0, 1)54        self.assert_formatCard('9C', 0, 8)55        self.assert_formatCard('TC', 0, 9)56        self.assert_formatCard('JC', 0, 10)57        self.assert_formatCard('QC', 0, 11)58        self.assert_formatCard('KC', 0, 12)59class BoardUnitTest(unittest.TestCase):60    def assert_init(self, deck):61        cards = len(deck)62        self.assertEqual(0, cards % 13, f"Uneven suits: {cards} cards in the deck")63        b = board.Board(deck)64        suits = cards // 1365        self.assertEqual(suits, b._nsuits)66        self.assertEqual(suits, len(b._foundations))67        for a, ace in enumerate(b._foundations):68            self.assertEqual(ace, board.noCard, f"Ace {a} not empty")69        self.assertEqual(suits * 2, len(b._tableau))70        for c, cascade in enumerate(b._tableau):71            expected = cards // len(b._tableau) + (c < suits)72            self.assertEqual(expected, len(cascade), f"Column {c} incorrectly sized")73            for r, card in enumerate(cascade):74                expected = deck[r * suits * 2 + c]75                self.assertEqual(expected, card)76        self.assertIsNone(b._memento, "Pre-initialised memento")77        self.assertTrue(b._resort, "Not initialised to resort")78        self.assertTrue(b._rehash, "Not initialised to rehash")79        self.assertEqual(len(b._tableau), len(b._sorted), "Hashing cascades not initialised")80        for s, actual in enumerate(b._sorted):81            self.assertEqual(b._tableau[s], actual)82    def test_init(self):83        self.assert_init(unshuffled)84    def assert_str(self, expected, deck):85        self.assertEqual(expected, str(board.Board(deck)))86    def test_str_unshuffled(self):87        expected = """Aces:88-S -H -D -C89Table:90AC 2C 3C 4C 5C 6C 7C 8C919C TC JC QC KC AD 2D 3D924D 5D 6D 7D 8D 9D TD JD93QD KD AH 2H 3H 4H 5H 6H947H 8H 9H TH JH QH KH AS952S 3S 4S 5S 6S 7S 8S 9S96TS JS QS KS -- -- -- --97Cells:98-- -- -- --99"""100        self.assert_str(expected, unshuffled)101    def test_str_reversed(self):102        expected = """Aces:103-S -H -D -C104Table:105KS QS JS TS 9S 8S 7S 6S1065S 4S 3S 2S AS KH QH JH107TH 9H 8H 7H 6H 5H 4H 3H1082H AH KD QD JD TD 9D 8D1097D 6D 5D 4D 3D 2D AD KC110QC JC TC 9C 8C 7C 6C 5C1114C 3C 2C AC -- -- -- --112Cells:113-- -- -- --114"""115        self.assert_str(expected, reversed)116    def test_str_two_aces(self):117        expected = """Aces:118-S -H -D -C119Table:120KH 2H 4D QC 3S 7D 7S KD121JD 4H 3C 7C 9H KS 7H 5D1226S AC QS TD JC 9C JH KC1239S 4S QH 2D 4C AD TS 8H124TH 2S JS 6C 9D 3D 8D TC1252C 5H 8C 5S 3H 6H 8S 6D126AH 5C QD AS -- -- -- --127Cells:128-- -- -- --129"""130        self.assert_str(expected, two_aces)131    def test_move_to_foundations_no_aces(self):132        b = board.Board(no_aces)133        b._rehash = b._resort = False134        actual = b.moveToFoundations()135        expected = []136        self.assertEqual(expected, actual)137        self.assertFalse(b._rehash)138        self.assertFalse(b._resort)139    def test_move_to_foundations_two_aces(self):140        b = board.Board(two_aces)141        b._rehash = b._resort = False142        actual = b.moveToFoundations()143        expected = [(0, -3,), (3, -4),]144        self.assertEqual(expected, actual)145        self.assertTrue(b._rehash)146        self.assertFalse(b._resort)147    def test_move_to_foundations_two_aces_two(self):148        b = board.Board(two_aces_two)149        b._rehash = b._resort = False150        actual = b.moveToFoundations()151        expected = [(0, -1,), (0, -4), (0, -4), ]152        self.assertEqual(expected, actual)153        self.assertTrue(b._rehash)154        self.assertFalse(b._resort)155    def test_move_to_foundations_reversed(self):156        setup = board.Board(reversed)157        setup._rehash = setup._resort = False158        actual = setup.moveToFoundations()159        self.assertTrue(setup._rehash)160        self.assertTrue(setup._resort)161    def test_move_to_foundations_cell(self):162        setup = board.Board(unshuffled)163        # One king in a cell164        for cascade in setup._tableau: cascade.clear()165        king = 12166        queen = king - 1167        clubs = 0168        setup._cells = [ board.noCard, board.makeCard(clubs, king), board.noCard, board.noCard, ]169        setup._foundations = [queen, king, king, king, ]170        expected = [(9, -1,),]171        actual = setup.moveToFoundations()172        self.assertEqual(expected, actual)173    def test_move_between_cascades_and_cells(self):174        b = board.Board(unshuffled)175        #   Move to cell #1176        b.moveCard((0,8,))177        self.assertTrue(b._rehash)178        self.assertEqual(1, b._firstFree)179        self.assertEqual(6, len(b._tableau[0]))180        #   Move to cell #2181        b.moveCard((0,9,))182        self.assertTrue(b._rehash)183        self.assertEqual(2, b._firstFree)184        self.assertEqual(5, len(b._tableau[0]))185        #   Move to cell #3186        b.moveCard((1,10,))187        self.assertTrue(b._rehash)188        self.assertEqual(3, b._firstFree)189        self.assertEqual(6, len(b._tableau[1]))190        #   Move to cell #4191        b.moveCard((2,11,))192        self.assertTrue(b._rehash)193        self.assertEqual(4, b._firstFree)194        self.assertEqual(6, len(b._tableau[2]))195        #   Move to cascade #1196        b.moveCard((11,3,))197        self.assertTrue(b._rehash)198        self.assertEqual(3, b._firstFree)199        self.assertEqual(8, len(b._tableau[3]))200        #   Move to cascade #2201        b.moveCard((10,3,))202        self.assertTrue(b._rehash)203        self.assertEqual(2, b._firstFree)204        self.assertEqual(9, len(b._tableau[3]))205        #   Move to cascade #3206        b.moveCard((8,3,))207        self.assertTrue(b._rehash)208        self.assertEqual(0, b._firstFree)209        self.assertEqual(10, len(b._tableau[3]))210        #   Move to cascade #4211        b.moveCard((9,1,))212        self.assertTrue(b._rehash)213        self.assertEqual(0, b._firstFree)214        self.assertEqual(7, len(b._tableau[1]))215    def test_move_between_cascades_and_foundations(self):216        b = board.Board(two_aces_two)217        #   Move to foundations #1218        b._resort = b._rehash = False219        b.moveCard((0,-1,))220        self.assertTrue(b._rehash)221        self.assertFalse(b._resort)222        self.assertEqual(0, b._firstFree)223        self.assertEqual(6, len(b._tableau[0]))224        self.assertEqual([0, board.noCard, board.noCard, board.noCard,], b._foundations)225        #   Move to foundations #2226        b._resort = b._rehash = False227        b.moveCard((0,-4,))228        self.assertTrue(b._rehash)229        self.assertFalse(b._resort)230        self.assertEqual(0, b._firstFree)231        self.assertEqual(5, len(b._tableau[0]))232        self.assertEqual([0, board.noCard, board.noCard, 0,], b._foundations)233        #   Move to foundations #3234        b._resort = b._rehash = False235        b.moveCard((0,-4,))236        self.assertTrue(b._rehash)237        self.assertFalse(b._resort)238        self.assertEqual(0, b._firstFree)239        self.assertEqual(4, len(b._tableau[0]))240        self.assertEqual([0, board.noCard, board.noCard, 1,], b._foundations)241        #   Move from foundations #1242        b._resort = b._rehash = False243        b.moveCard((-4,0,), False)244        self.assertTrue(b._rehash)245        self.assertFalse(b._resort)246        self.assertEqual(0, b._firstFree)247        self.assertEqual(5, len(b._tableau[0]))248        self.assertEqual([0, board.noCard, board.noCard, 0,], b._foundations)249        #   Move from foundations #2250        b._resort = b._rehash = False251        b.moveCard((-4,0,), False)252        self.assertTrue(b._rehash)253        self.assertFalse(b._resort)254        self.assertEqual(0, b._firstFree)255        self.assertEqual(6, len(b._tableau[0]))256        self.assertEqual([0, board.noCard, board.noCard, board.noCard,], b._foundations)257        #   Move from foundations #2258        b._resort = b._rehash = False259        b.moveCard((-1,0,), False)260        self.assertTrue(b._rehash)261        self.assertFalse(b._resort)262        self.assertEqual(0, b._firstFree)263        self.assertEqual(7, len(b._tableau[0]))264        self.assertEqual([board.noCard, board.noCard, board.noCard, board.noCard,], b._foundations)265    def test_move_to_empty_cascade(self):266        b = board.Board(unshuffled)267        #   Put kings on the foundations268        king = 12269        for f, foundation in enumerate(b._foundations): b._foundations[f] = king270        #   Clear the cascades271        for cascade in b._tableau: cascade.clear()272        #   Move a king to an empty cell273        b.moveCard((-1, 0,))274        self.assertEqual(king, b._tableau[0][-1])275        self.assertTrue(b._resort)276        self.assertTrue(b._rehash)277        #   Move a queen to an empty cell278        queen = king - 1279        b._resort = b._rehash = False280        b.moveCard((-1, 1,))281        self.assertEqual(queen, b._tableau[1][-1])282        self.assertTrue(b._resort)283        self.assertTrue(b._rehash)284    def test_enumerate_finish_to_card(self):285        b = board.Board(no_aces)286        start = 1287        actual = b.enumerateFinishCascades(start, b._tableau[start][-1])288        expected = [(start, 7,),]289        self.assertEqual(expected, actual)290    def test_enumerate_finish_to_empty(self):291        setup = board.Board(unshuffled)292        #   Set up an empty cascade293        setup._foundations = [12 - len(setup._tableau) + 1, 12, 12, 12,]294        #   Clubs in the top row, except the first cascade295        for start, cascade in enumerate(setup._tableau):296            cascade.clear()297            if start: cascade.append(start)298        #   Every card can move to the leftmost or the next cascade299        for start, cascade in enumerate(setup._tableau):300            if cascade:301                actual = setup.enumerateFinishCascades(start, cascade[-1])302                expected = []303                if start + 1 < len(setup._tableau):304                    expected.append( (start, start + 1,) )305                self.assertEqual(expected, actual, f"From cascade {start}" )306    def test_enumerate_moves_unshuffled(self):307        setup = board.Board(unshuffled)308        width = len(setup._tableau)309        #   3. Move from cascades to the next open width310        expected = [(start, width,) for start in range(width)]311        #   2. Move from cascades to cascades312        for start in range(width):313            if start != 3:314                expected.append((start, (start + 1) % width,))315        #   1. Move from cells to cascades316        #   ...317        actual = setup.enumerateMoves()318        self.assertEqual(expected, actual)319    def test_enumerate_moves_no_aces(self):320        setup = board.Board(no_aces)321        width = len(setup._tableau)322        #   3. Move from cascades to the next open cell323        expected = [(start, width,) for start in range(width)]324        #   2. Move from cascades to cascades325        expected.append( (1, 7,) )326        #   1. Move from cells to cascades327        actual = setup.enumerateMoves()328        self.assertEqual(expected, actual)329    def test_enumerate_moves_two_aces(self):330        setup = board.Board(two_aces)331        width = len(setup._tableau)332        #   3. Move from cascades to the next open cell333        expected = [(start, width,) for start in range(width)]334        #   2. Move from cascades to cascades335        #   1. Move from cells to cascades336        actual = setup.enumerateMoves()337        self.assertEqual(expected, actual)338    def test_enumerate_moves_two_aces_two(self):339        setup = board.Board(two_aces_two)340        width = len(setup._tableau)341        #   Clear the aces342        setup.moveToFoundations()343        #   First move options - cells only344        expected = [(start, width,) for start in range(width)]345        actual = setup.enumerateMoves()346        self.assertEqual( expected, actual )347        #   Uncover QH348        setup.moveCard( (3, width,) )349        expected = [(start, width + 1,) for start in range(width)]350        expected.append( (3, 6,) )351        actual = setup.enumerateMoves()352        self.assertEqual(expected, actual)353    def test_enumerate_moves_keep_sequences(self):354        setup = board.Board(unshuffled)355        setup._foundations = [8, 6, 8, 7, ]356        cascades = ( "TC 9C", "7H 9S 9D KH QH JH TH", "QC", "QD JD TD", "8C KD", "9H 8H", "KS QS JS", "",)357        for t, s in enumerate( cascades ):358            setup._tableau[ t ] = board.parseDeck( s )359        setup._cells = board.parseDeck( "KC TS JC --" )360        setup._firstFree = 3361        #   Validate stacking362        stacked = (0, 1, 3, 5, 6, )363        for c in stacked: self.assertTrue( board.isStacked( setup._tableau[ c ] ) )364        isolate = (2, 4, )365        for c in isolate: self.assertFalse( board.isStacked( setup._tableau[ c ] ) )366        opens = (7, )367        for c in opens: self.assertFalse( board.isStacked( setup._tableau[ c ] ) )368        expected = []369        #   Move stacked cascades to open cell370        expected.extend( [ (start, 11, ) for start in stacked ] )371        expected.pop()372        #   Move isolate cascades to open cell373        expected.extend( [ (start, 11, ) for start in isolate ] )374       #   Move cell 1 to open cascade375        expected.append( ( 8, 7, ) )376        #   Move cell 2 to stack377        expected.append( ( 9, 6, ) )378        #   Move cell 2 to open cascade379        expected.append( ( 9, 7, ) )380        #   Move cell 3 to stack381        expected.append( ( 10, 2, ) )382        #   Move cell 3 to open cascade383        expected.append( ( 10, 7, ) )384        #   Move stacked cascades to open cascade385        expected.extend( [ (start, 7, ) for start in stacked ] )386        expected.pop()387        #   Move isolate cascades to open cascade388        expected.extend( [ (start, 7, ) for start in isolate ] )389        expected.pop( -2 )390        actual = setup.enumerateMoves()391        self.assertEqual(expected, actual)392    def test_enumerate_moves_cover_aces(self):393        setup = board.Board(unshuffled)394        setup._foundations = [4, -1, 12, 1, ]395        cascades = (396            "QS",397            "KD QD JD TD 9D 8D 7D 6D 5D 4D",398            "JC",399            "JS TS 9S 8S",400            "KC QC JC TC 9C 8C 7C",401            "2D",402            "3S AD KS",403            "7S 6S",)404        for t, s in enumerate( cascades ):405            setup._tableau[ t ] = board.parseDeck( s )406        setup._cells = board.parseDeck( "4S -- 5S 3D" )407        setup._firstFree = 1408        expected = [(3, 9), (7, 9), (0, 9), (2, 9), (5, 9), (6, 9), (10, 7), (11, 1), (0, 6), ]409        actual = setup.enumerateMoves()410        self.assertEqual(expected, actual)411    def test_memento_values(self):412        visited = set()413        for deck in (unshuffled, reversed, no_aces, two_aces, two_aces_two,):414            setup = board.Board(deck)415            self.assertTrue(setup._resort)416            self.assertTrue(setup._rehash)417            memento = setup.memento()418            self.assertFalse(setup._resort)419            self.assertFalse(setup._rehash)420            self.assertEqual(memento, setup._memento)421            self.assertFalse( memento in visited, f"Duplicate memento #{len(visited)}: {memento}" )422            visited.add(memento)423            #   Check that a new copy gets the same hash424            setup = board.Board(deck)425            self.assertEqual(memento, setup.memento())426    def test_not_solved(self):427        setup = board.Board(unshuffled)428        self.assertFalse( setup.solved() )429    def test_solved(self):430        setup = board.Board(reversed)431        setup.moveToFoundations()432        self.assertTrue( setup.solved() )433    def test_backtrack_foundations(self):434        setup = board.Board(reversed)435        expected = str(setup)436        #   Backtracking should undo everything437        moves = setup.moveToFoundations()438        setup.backtrack( moves )439        actual = str(setup)440        self.assertEqual(expected, actual)441    def assert_backtrack(self, deck):442        setup = board.Board(deck)443        setup.moveToFoundations()444        moves = setup.enumerateMoves()445        for move in moves:446            expected = str(setup)447            setup.moveCard(move)448            setup.backtrack( [move,] )449            actual = str(setup)450            self.assertEqual(expected, actual, move)451    def test_backtrack_no_aces(self):452        self.assert_backtrack(no_aces)453    def test_backtrack_two_aces(self):454        self.assert_backtrack(two_aces)455    def test_backtrack_two_aces_two(self):456        self.assert_backtrack(two_aces_two)457    def assert_solve(self, setup, expected, display = False):458        b = board.Board(setup)459        solution = b.solve()460        actual = len(solution)461        self.assertEqual(expected, actual)462        if display:463            forward = []464            while solution:465                moves = solution.pop()466                forward.append(moves.copy())467                b.backtrack(moves)468            forward.reverse()469            for moves in forward:470                print(moves)471                for move in moves:472                    b.moveCard(move, False)473                print(b)474    def test_solve_unshuffled(self):475        self.assert_solve(unshuffled, 958)476    def test_solve_reversed(self):477        self.assert_solve(reversed, 1)478    def test_solve_no_aces(self):479        self.assert_solve(no_aces, 555, False)480    def test_solve_two_aces(self):481        self.assert_solve(two_aces, 86, False)482    def test_solve_two_aces_two(self):483        self.assert_solve(two_aces_two, 72)484if __name__ == '__main__':...borg.py
Source:borg.py  
1import datetime2import dateutil.parser3import json4import os5import subprocess6from prometheus_client import Gauge7class FileBackup():8    def parse(self, dict):9        self._name = dict['name']10        self._time = dateutil.parser.isoparse(dict['time'])11        return self12    def getName(self):13        return self._name14    def inject(self, list):15        list.append(self)16        return self17    def setTimestamp(self, gauge):18        gauge.set(self._time.timestamp())19        return self20    def print(self):21        print('- '+self._name+' made at '+str(self._time) )22class Borg:23    def __init__(self):24        self._port = 2325        self._borgPassword = os.environ.get('BORG_PASSWORD')26        self._encryptionMode = 'repokey-blake2'27    def resort(self, resort):28        self._resort = resort29        return self30    def user(self, user):31        self._user = user32        return self33    def host(self, host):34        self._host = host35        return self36    def port(self, port):37        self._port = port38        return self39    def path(self, path):40        self._path = path41        return self42    def keyFilePath(self, keyFilePath):43        self._keyFilePath = keyFilePath44        return self45    def borgPassword(self, borgPassword):46        self._borgPassword = borgPassword47        return self48    def init(self, copies):49        repositoryNumber = 150        while repositoryNumber <= copies:51            self.__createRepository(repositoryNumber)52            repositoryNumber += 153        print("Created")54        repositoryNumber = 155        while repositoryNumber <= copies:56            self.__initRepository(repositoryNumber)57            repositoryNumber += 158    def __createRepository(self, repositoryNumber):59        try:60            fileRepository = 'repo'+str(repositoryNumber)61            self._resort.adapter('files').createFolder(fileRepository)62            print("Folder for Repository "+str(repositoryNumber)+" created")63        except OSError:64            print("Folder for Repository "+str(repositoryNumber)+" already exists")65    def __initRepository(self, repositoryNumber):66        repo = self.__makeRepo(repositoryNumber)67        print("Initializing Repository "+str(repositoryNumber))68        completedProcess = self.command([69            'init',70            '-e',71            self._encryptionMode,72            repo73            ], repositoryNumber)74        if completedProcess.returncode != 0:75            print("Process did not return success:")76            print("Code: "+ str(completedProcess.returncode))77            print( str(completedProcess.stdout) )78            print( str(completedProcess.stderr) )79            return self80        print("Initialized Repository "+str(repositoryNumber)+" successfully")81        return self82    def remove(self, name, repositoryNumber):83        self.__findBackup(name, repositoryNumber)84        print("Removing backup "+name+" from Repository "+str(repositoryNumber))85        completedProcess = self.command([86            'delete',87            '::'+name,88            ], repositoryNumber)89        if completedProcess.returncode != 0:90            print("Process did not return success:")91            print("Code: "+ str(completedProcess.returncode))92            print( completedProcess.stdout.decode('utf-8') )93            print( completedProcess.stderr.decode('utf-8') )94            return self95        print("Removal of "+name+" from Repository "+str(repositoryNumber)+" finished successfully")96        return self97    def backup(self, name, target, repositoryNumber):98        print("Backing up "+target+" to Repository "+str(repositoryNumber))99        completedProcess = self.command([100            'create',101            '::'+name,102            '.'103            ], repositoryNumber, directory=target, capture_output=False)104        print("Backup of "+target+" to Repository "+str(repositoryNumber)+" finished successfully")105        return self106    def umount(self, target):107        completedProcess = subprocess.run([108            'fusermount',109            '-u',110            target111            ],112            capture_output=True113            )114        if completedProcess.returncode != 0:115            print("Failed to unmount "+target)116            print("Code: "+ str(completedProcess.returncode))117            print( completedProcess.stdout.decode('utf-8') )118            print( completedProcess.stderr.decode('utf-8') )119            return self120    def mount(self, name, target, repositoryNumber):121        print("Mounting backup "+name+" from Repository "+str(repositoryNumber)+' to '+target)122        print("The borg mount is run in foreground to facilitate usage inside Docker")123        print("Please cancel the program with an interrupt (control+c) after you are done.")124        completedProcess = self.command([125            'mount',126            '-f',127            '::'+name,128            target129            ], repositoryNumber, check=False)130        if completedProcess.returncode != 0:131            print("Process did not return success:")132            print("Code: "+ str(completedProcess.returncode))133            print( completedProcess.stdout.decode('utf-8') )134            print( completedProcess.stderr.decode('utf-8') )135            return self136        print("Mounted backup "+name+" from Repository "+str(repositoryNumber)+' to '+target)137        return self138    def list(self, repositoryNumber):139        completedProcess = self.command([140            'list',141            '--json',142            '::'143            ], repositoryNumber, check=False)144        if completedProcess.returncode != 0:145            print("Process did not return success:")146            print("Code: "+ str(completedProcess.returncode))147            print( completedProcess.stdout.decode('utf-8') )148            print( completedProcess.stderr.decode('utf-8') )149            raise ValueError("List process failed")150        output = completedProcess.stdout.decode('utf-8')151        decodedOutput = json.loads(output)152        return self.__archivesToBackups(decodedOutput['archives'])153    def __archivesToBackups(self, list):154        backups = []155        for archive in list:156            backup = FileBackup().parse(archive)157            backup.inject(backups)158        sortedBackups = sorted(backups, key=lambda backup : backup._time)159        return sortedBackups160            161            162    def restore(self, name, target, repositoryNumber):163        backup = self.__findBackup(name, repositoryNumber)164        print("Restoring backup "+backup.getName()+" from Repository "+str(repositoryNumber)+' to '+target)165        completedProcess = self.command([166            'extract',167            '::'+backup.getName()168            ], repositoryNumber, directory=target)169    def prune(self, repositoryNumber):170        repo = self.__makeRepo(repositoryNumber)171        print("Pruning file backups in repository "+repositoryNumber)172        completedProcess = self.command([173            'prune',174            '--keep-daily=14',175            '--keep-weekly=6',176            '--keep-monthly=6',177            '::'178            ], repositoryNumber)179        if completedProcess.returncode != 0:180            print("Process did not return success:")181            print("Code: "+ str(completedProcess.returncode))182            print( completedProcess.stdout.decode('utf-8') )183            print( completedProcess.stderr.decode('utf-8') )184            return self185        return completedProcess.stdout.decode('utf-8')186    def command(self, args, repoNumber, directory=None, check=True,187            capture_output=True):188        if directory is None:189            directory = os.getcwd()190        return subprocess.run(191                ['borgbackup'] + args,192                capture_output=capture_output,193                env={194                'BORG_NEW_PASSPHRASE': self._borgPassword,195                'BORG_PASSPHRASE': self._borgPassword,196                'BORG_REPO': self.__makeRepo(repoNumber),197                'SSH_AUTH_SOCK': os.environ.get('SSH_AUTH_SOCK', ''),198                'BORG_RSH': "ssh -o StrictHostKeyChecking=accept-new -i "+self._keyFilePath199                }, cwd=directory, check=check)200    def __makeRepo(self, number):201        return 'ssh://'+self._user+'@'+self._host+':'+str(self._port)+'/.'+self._path+'/repo'+str(number)202    def __findBackup(self, target, repositoryNumber):203        if target == 'latest':204            return self.list(repositoryNumber)[-1]205            206        return target207    def getRepositories(self):208        repos = []209        for directoryName in self._resort.adapter('files').listFolders():210            start = len('repo')211            end = len(directoryName)212            repos.append( directoryName[start:end] )213        return repos214    215    def scrape(self, gauge):216        for repositoryNumber in self.getRepositories():217            try:218                backup = self.__findBackup('latest', repositoryNumber)219            except IndexError:220                gauge.labels(self._resort._name, 'repository_'+str(repositoryNumber)).set(0)221                continue222            backup.setTimestamp( gauge.labels(self._resort._name, 'repository_'+str(repositoryNumber)) )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
