Best Python code snippet using slash
sentinel.py
Source:sentinel.py  
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import sys4if sys.version_info < (3, 8, 1):5    raise RuntimeError('Requires Python version 3.8.1 or higher. This version: ' + str(sys.version_info))6if sys.platform not in ('linux','linux2','darwin','cygwin'):7    raise RuntimeError('Platform not supported. This platform: ' + str(sys.platform))8import sqlite39if sqlite3.sqlite_version_info < (3, 28, 0):10    raise RuntimeError('Requires Python sqlite3 library 3.28.0 or higher. This version: ' + str(sqlite3.sqlite_version))11import os12sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__),'..')))13from tools import *14from store import *15import json16__version__ = tools.__version__17def usage():18    print(sys.argv[0] + ''' [option]19    options:20        list-proms21        nmap-net net22        ping-net ip/net23        net-scan net24        port-scan [ip/net] [level]25        list-nmaps26        nmap ip [level]27        del-nmap ip28        clear-nmaps29        vuln-scan [ip/net]30        list-vulns [id]31        del-vuln id32        clear-vulns33        check-vuln id34        email-vuln id35        arps36        manuf mac37        lsof port38        rdns ip [srv]39        myip40        udp ip port41        udpscan ip port42        tcp ip port43        list-macs44        update-manuf mac45        update-dns mac ip46        listening47        listening-detailed48        listening-details port49        listening-allowed50        listening-alerts51        listening-allow port52        listening-remove port53        established54        established-lsof55        established-rules56        established-rules-filter57        established-rule ALLOW|DENY proto laddr lport faddr fport58        established-alerts59        delete-established-rule rowid60        clear-established-rules61        list-ips62        update-ip ip data63        update-ip-item ip item value64        delete-ip-item ip item value65        del-ip ip66        clear-ips67        list-jobs68        list-jobs-available69        update-job name data70        delete-job name71        clear-jobs72        list-configs73        update-config name data74        delete-config name75        clear-configs76        list-rules77        update-rule name data78        delete-rule name79        clear-rules80        list-reports81        update-report name data82        delete-report name83        clear-reports84        list-alerts85        delete-alert id86        run-alert name87        update-alert name data88        run-alert name89        clear-alerts90        list-fims91        list-fims-changed92        check-fim [name]93        b2sum-fim [name]94        b2sum /dir/file95        update-fim name data96        delete-fim id97        add-fim name /dir/file98        del-fim name /dir/file99        list-files100        add-file /dir/file101        del-file /dir/file102        fim-restore /dir/file [/dir/file]103        fim-diff104        clear-files105        file-type /dir/file106        av-scan dir|file107        list-avs108        list-proms-db109        update-prom-db name data110        clear-proms-db111        list-b2sums112        clear-b2sums113        # list-sshwatch114        # clear-sshwatch115        list-counts116        clear-counts117        list-model [id|tags tag]118        update-model tag json119        update-model-tag id tag120        delete-model id121        clear-model122        list-training [id|tags tag]123        update-training tag json124        update-training-tag id tag125        delete-training id126        clear-training127        list-occurrence [name|-eq,-gt,-lt,-ne,-le,-ge num]128        delete-occurrence name129        clear-occurrence130        copy-occurrence name131        sample-logstream count132        # ? mark-training tag133        # ? mark-training-on name134        list-system-profile135        list-system-profile-full136        gen-system-profile137        get-system-profile-name name138        get-system-profile-rowid rowid139        del-system-profile-name name140        del-system-profile-rowid rowid141        clear-system-profile142        diff-system-profile-rowid rowid rowid143        get-system-profile-data rowid data144        tail file145        logstream146        logstream-json147        logstream-keys148        run-create-db149        run-ps150        sentry151        ---152        config153                logstream:154                    rules155                    sklearn naive_bayes.MultinomialNB156                            naive_bayes.BernoulliNB157                            neural_network.MLPClassifier158                tail:159                    rules160                http_server161                pushgateway162        list-keys163        list-keys-metric164        list-vals165        get-key key166        expire-keys key1 key2 key3...167Version: {} '''.format(__version__))168def printArps():169    arpTbl = tools.getArps()170    for k,v in arpTbl.items():171        if (v == '(incomplete)') or (v == '<incomplete>'):172            continue173        print(v,k)174    return True175def main():176    db_store = 'sentinel.db'177    db_manuf = str(os.path.dirname(__file__)) + '/db/manuf'178    if sys.argv[1:]:179        if sys.argv[1] == '--version':180            print(__version__)181            sys.exit(0)182        if sys.argv[1] == 'manuf':183            mac = sys.argv[2]184            mfname = store.get_manuf(mac, db_manuf)185            print(mfname)186            sys.exit(0)187        if sys.argv[1] == 'arps':188            printArps()189            sys.exit(0)190        if sys.argv[1] == 'list-macs':191            store.print_all(db_store)192            sys.exit(0)193        if sys.argv[1] == 'update-manuf':194            mac = sys.argv[2]195            mfname = store.get_manuf(mac, db_manuf)196            update = store.update_data_manuf(mac, mfname, db_store)197            print(update)198            sys.exit(0)199        if sys.argv[1] == 'rdns':200            ip = sys.argv[2]201            try: srv = sys.argv[3]202            except IndexError: srv = None203            dnsname = tools.getNSlookup(ip, srv)204            print(dnsname)205            sys.exit(0)206        if sys.argv[1] == 'update-dns':207            mac = sys.argv[2]208            ip = sys.argv[3]209            #dnsname = tools.getDNSName(ip)210            #update = store.update_data_dns(mac, dnsname, db_store)211            import threading212            dns = store.DNSUpDateTask()213            t = threading.Thread(target=dns.run, args=(mac,ip,db_store,))214            t.start()215            #print(t)216            sys.exit(0)217        if sys.argv[1] == 'ping-net':218            ip = sys.argv[2]219            pn = tools.pingNet(ip)220            print(pn)221            sys.exit(0)222        if sys.argv[1] == 'nmap-net':223            ip = sys.argv[2]224            pn = tools.nmapNet(ip)225            print(pn)226            sys.exit(0)227        if sys.argv[1] == 'net-scan':228            net = sys.argv[2]229            scan = tools.netScan(net, db_store, {}, 'net-scan')230            print(scan)231            sys.exit(0)232        if sys.argv[1] == 'listening':233            p = tools.printListenPorts()234            sys.exit(0)235        if sys.argv[1] == 'listening-detailed':236            p = tools.printListenPortsDetailed()237            sys.exit(0)238        if sys.argv[1] == 'listening-details':239            port = sys.argv[2]240            p = tools.printLsOfPort(port)241            sys.exit(0)242        if sys.argv[1] == 'listening-allowed':243            p = store.printListeningAllowed(db_store)244            sys.exit(0)245        if sys.argv[1] == 'listening-allow':246            port = sys.argv[2]247            insert = store.insertAllowedPort(port, db_store)248            print(insert)249            sys.exit(0)250        if sys.argv[1] == 'listening-remove':251            port = sys.argv[2]252            remove = store.deleteAllowedPort(port, db_store)253            print(remove)254            sys.exit(0)255        if sys.argv[1] == 'listening-alerts':256            alerts = store.printListeningAlerts(db_store)257            sys.exit(0)258        if sys.argv[1] == 'established':259            established = tools.printEstablished()260            sys.exit(0)261        if sys.argv[1] == 'established-lsof':262            established = tools.printEstablishedLsOf()263            sys.exit(0)264        if sys.argv[1] == 'established-rules':265            established_rules = tools.printEstablishedRules(db_store)266            sys.exit(0)267        if sys.argv[1] == 'established-rule':268            #established-rule proto laddr lport faddr fport269            rule  = sys.argv[2]270            proto = sys.argv[3]271            laddr = sys.argv[4]272            lport = sys.argv[5]273            faddr = sys.argv[6]274            fport = sys.argv[7]275            insert_rule = store.insertEstablishedRules(rule, proto, laddr, lport, faddr, fport, db_store)276            sys.exit(0)277        if sys.argv[1] == 'established-rules-filter':278            print_alerts = tools.printEstablishedRulesMatch(db_store)279            sys.exit(0)280        if sys.argv[1] == 'established-alerts':281            print_alerts = tools.printEstablishedAlerts(db_store)282            sys.exit(0)283        if sys.argv[1] == 'delete-established-rule':284            rowid = sys.argv[2]285            delete = store.deleteFromRowid('established', rowid, db_store)286            print(delete)287            sys.exit(0)288        if sys.argv[1] == 'clear-established-rules':289            clear = store.clearAll('established', db_store)290            print(clear)291            sys.exit(0)292        if sys.argv[1] == 'clear-configs':293            clear = store.clearAll('configs', db_store)294            print(clear)295            sys.exit(0)296        if sys.argv[1] == 'lsof':297            port = sys.argv[2]298            lsof = tools.printLsOfPort(port)299            sys.exit(0)300        if sys.argv[1] == 'nmap':301            ip = sys.argv[2]302            try: level = sys.argv[3]303            except IndexError: level = 1304            scan = tools.nmapScan(ip, level)305            update = store.replaceNmaps(ip, scan, db_store)306            print(str(update) + ' ' + str(scan))307            sys.exit(0)308        if sys.argv[1] == 'list-ips':309            rows = store.selectAll('ips', db_store)310            for row in rows:311                print(row)312            sys.exit(0)313        if sys.argv[1] == 'add-ip':314            ip = sys.argv[2]315            insert = store.insertIPs(ip, db_store)316            print(insert)317            sys.exit(0)318        if sys.argv[1] == 'del-ip':319            ip = sys.argv[2]320            _del = store.deleteIPs(ip, db_store)321            print(_del)322            sys.exit(0)323        if sys.argv[1] == 'update-ip':324            ip = sys.argv[2]325            data = sys.argv[3]326            try: valid_json = json.loads(data)327            except json.decoder.JSONDecodeError:328                print('invalid json')329                sys.exit(1)330            replace = store.replaceINTO('ips', ip, data, db_store)331            print(replace)332            sys.exit(0)333        if sys.argv[1] == 'update-ip-item':334            name = sys.argv[2]335            item = sys.argv[3]336            val  = sys.argv[4]337            update = store.updateDataItem(item, val, 'ips', name, db_store)338            print(update)339            sys.exit(0)340        if sys.argv[1] == 'delete-ip-item':341            name = sys.argv[2]342            item = sys.argv[3]343            delete = store.deleteDataItem(item, 'ips', name, db_store)344            print(delete)345            sys.exit(0)346        if sys.argv[1] == 'clear-ips':347            clear = store.clearAll('ips', db_store)348            print(clear)349            sys.exit(0)350        if sys.argv[1] == 'discover-net':351            ipnet = None352            level = None353            try:354                ipnet = sys.argv[2]355                level = sys.argv[3]356            except IndexError: pass357            if ipnet is None:358                ipnet = tools.getIfconfigIPv4()359            else:360                i = ipnet.split('.')361                if len(i) == 1:362                    level = sys.argv[2]363                    ipnet = tools.getIfconfigIPv4()364            if level is None:365                level = 1366            run_discovery = tools.runDiscoverNet(ipnet, level, db_store)367            print(run_discovery)368            sys.exit(0)369        if sys.argv[1] == 'list-nmaps':370            scans = store.getNmaps(db_store)371            for row in scans:372                print(row)373            sys.exit(0)374        if sys.argv[1] == 'del-nmap':375            ip = sys.argv[2]376            del_ = store.deleteNmaps(ip, db_store)377            print(del_)378            sys.exit(0)379        if sys.argv[1] == 'clear-nmaps':380            clear = store.clearAllNmaps(db_store)381            print(clear)382            sys.exit(0)383        if sys.argv[1] == 'list-vulns':384            try: vid = sys.argv[2]385            except IndexError: vid = None386            run = tools.printVulnScan(db_store, vid)387            sys.exit(0)388        if sys.argv[1] == 'del-vuln':389            ip = sys.argv[2]390            del_ = store.deleteVulns(ip, db_store)391            print(del_)392            sys.exit(0)393        if sys.argv[1] == 'clear-vulns':394            clear = store.clearAllVulns(db_store)395            print(clear)396            sys.exit(0)397        if sys.argv[1] == 'check-vuln':398            vid = sys.argv[2]399            data = store.getVulnData(vid, db_store)400            run = tools.processVulnData(data)401            print(run)402            sys.exit(0)403        if sys.argv[1] == 'email-vuln':404            vid = sys.argv[2]405            data = store.getVulnData(vid, db_store)406            subject = 'sentinel vuln-scan'407            email = tools.sendEmail(subject, data, db_store)408            print(email)409            sys.exit(0)410        if sys.argv[1] == 'myip':411            myip = tools.getIfconfigIPv4()412            print(myip)413            sys.exit(0)414        if sys.argv[1] == 'udp':415            ip = sys.argv[2]416            port = sys.argv[3]417            run = tools.nmapUDP(ip, port)418            print(run)419            sys.exit(0)420        if sys.argv[1] == 'udpscan':421            ip = port = None422            try:423                ip = sys.argv[2]424                port = sys.argv[3]425            except IndexError: pass426            run = tools.nmapUDPscan(ip, port)427            print(run)428            sys.exit(0)429        if sys.argv[1] == 'tcp':430            ip = sys.argv[2]431            port = sys.argv[3]432            run = tools.nmapTCP(ip, port)433            print(run)434            sys.exit(0)435        if sys.argv[1] == 'list-detects':436            try: id_ = sys.argv[2]437            except IndexError: id_ = None438            run = tools.printDetectScan(db_store, id_)439            sys.exit(0)440        if sys.argv[1] == 'detect-scan':441            ip = sys.argv[2]442            scan = tools.nmapDetectScanStore(ip, db_store)443            print(str(scan))444            sys.exit(0)445        if sys.argv[1] == 'del-detect':446            id_ = sys.argv[2]447            del_ = store.deleteDetect(id_, db_store)448            print(del_)449            sys.exit(0)450        if sys.argv[1] == 'clear-detects':451            clear = store.clearAllDetects(db_store)452            print(clear)453            sys.exit(0)454        if sys.argv[1] == 'port-scan':455            ipnet = None456            level = 1457            try:458                ipnet = sys.argv[2]459                level = sys.argv[3]460            except IndexError: pass461            if ipnet is None:462                myip = tools.getIfconfigIPv4()463                ipn = tools.getIpNet(myip)464                print('discover net: ' + str(ipn))465                ipnet = tools.nmapNet(ipn)466            else:467                i = ipnet.split('.')468                if len(i) == 1:469                    level = sys.argv[2]470                    myip = tools.getIfconfigIPv4()471                    ipn = tools.getIpNet(myip)472                    print('discover net: ' + str(ipn))473                    ipnet = tools.nmapNet(ipn)474                else:475                    if tools.isNet(ipnet):476                        print('discover net: ' + str(ipnet))477                        ipnet = tools.nmapNet(ipnet)478            479            if type(ipnet) == str:480                ipnet = ipnet.split()481            scan = tools.runNmapScanMultiProcess(ipnet, level, db_store)482            print(scan)483            sys.exit(0)484        if sys.argv[1] == 'vuln-scan':485            try: ipnet = sys.argv[2]486            except IndexError: ipnet = None487            if ipnet is None:488                myip = tools.getIfconfigIPv4()489                ipn = tools.getIpNet(myip)490                print('discover net: ' + str(ipn))491                ipnet = tools.nmapNet(ipn)492            else:493                if tools.isNet(ipnet):494                    print('discover net: ' + str(ipnet))495                    ipnet = tools.nmapNet(ipnet)496            if type(ipnet) == str:497                ipnet = ipnet.split()498            scan = tools.runNmapVulnMultiProcess(ipnet, db_store)499            print(scan)500            sys.exit(0)501        if sys.argv[1] == 'detect-scan-net':502            ipnet = None503            try: ipnet = sys.argv[2]504            except IndexError: pass505            if ipnet is None:506                ipnet = tools.getIfconfigIPv4()507            ipn = tools.getIpNet(ipnet)508            print('ipnet: ' + ipn)509            hostLst = tools.nmapNet(ipn)510            scan = tools.runNmapDetectMultiProcess(hostLst, db_store)511            print(scan)512            sys.exit(0)513        if sys.argv[1] == 'list-configs':514            run = tools.printConfigs(db_store)515            print(run)516            sys.exit(0)517        if sys.argv[1] == 'update-config':518            name = sys.argv[2]519            data = sys.argv[3]520            try: valid_json = json.loads(data)521            except json.decoder.JSONDecodeError: 522                print('invalid json') 523                sys.exit(1)524            run = store.replaceINTO('configs', name, data, db_store)525            print(run)526            sys.exit(0)527        if sys.argv[1] == 'delete-config':528            rowid = sys.argv[2]529            run = store.deleteFrom('configs', rowid, db_store)530            print(run)531            sys.exit(0)532        if sys.argv[1] == 'list-rules':533            rows = store.selectAll('rules', db_store)534            for row in rows:535                print(row)536            sys.exit(0)537        if sys.argv[1] == 'update-rule':538            name = sys.argv[2]539            data = sys.argv[3]540            try: valid_json = json.loads(data)541            except json.decoder.JSONDecodeError:542                print('invalid json')543                sys.exit(1)544            run = store.replaceINTO('rules', name, data, db_store)545            print(run)546            sys.exit(0)547        if sys.argv[1] == 'delete-rule':548            name = sys.argv[2]549            run = store.deleteFrom('rules', name, db_store)550            print(run)551            sys.exit(0)552        if sys.argv[1] == 'clear-rules':553            clear = store.clearAll('rules', db_store)554            print(clear)555            sys.exit(0)556        if sys.argv[1] == 'list-jobs':557            rows = store.selectAll('jobs', db_store)558            for row in rows:559                print(row)560            sys.exit(0)561        if sys.argv[1] == 'update-job':562            name = sys.argv[2]563            data = sys.argv[3]564            try: valid_json = json.loads(data)565            except json.decoder.JSONDecodeError: 566                print('invalid json') 567                sys.exit(1)568            run = store.replaceINTO('jobs', name, data, db_store)569            print(run)570            sys.exit(0)571        if sys.argv[1] == 'delete-job':572            name = sys.argv[2]573            run = store.deleteJob(name, db_store)574            print(run)575            sys.exit(0)576        if sys.argv[1] == 'clear-jobs':577            clear = store.clearAllJobs(db_store)578            print(clear)579            sys.exit(0)580#581# run-job name582#        if sys.argv[1] == 'run-job':583#            name = sys.argv[2]584#            run = tools.runJob(name, db_store)585#            print(str(run))586#            sys.exit(0)587#def runJob(name, db_store, gDict):588        if sys.argv[1] == 'sentry':589            try: v = sys.argv[2]590            except IndexError: v = False591            run = tools.sentryMode(db_store, verbose=v)592            print(str(run))593            sys.exit(0)594#595# list-jobs-running596        #if sys.argv[1] == 'list-jobs-running':597        #    run = tools.listRunning(db_store)598        #    sys.exit(0)599#def listRunning(db_store):600#    rows = store.getAllCounts(db_store)601#    for row in rows:602#        print(row)603#    return True604#def getAllCounts(db_file):605#    con = sql_connection(db_file)606#    cur = con.cursor()607#    cur.execute("SELECT * FROM counts;")608#    rows = cur.fetchall()609#    return rows610        if sys.argv[1] == 'b2sum':611            _file = sys.argv[2]612            b2sum = tools.b2sum(_file)613            print(_file + ' ' + b2sum)614            sys.exit(0)615        if sys.argv[1] == 'b2sum-fim':616            try: name = sys.argv[2]617            except IndexError: name = None618            if name is None:619                fims = store.selectAll('fims', db_store)620                for i in fims:621                    name = i[0]622                    run = tools.b2sumFim(name, db_store)623                    print(str(name) + ' ' + str(run))624            else:625                run = tools.b2sumFim(name, db_store)626                print(str(run))627            sys.exit(0)628        if sys.argv[1] == 'check-fim':629            try: name = sys.argv[2]630            except IndexError: name = None631            if name is None:632                fims = store.selectAll('fims', db_store)633                for i in fims:634                    name = i[0]635                    run = tools.printFim(name, db_store)636                    print(str(name) + ' ' + str(run))637            else:638                run = tools.printFim(name, db_store)639                print(str(run))640            sys.exit(0)641        if sys.argv[1] == 'list-fims':642            run = tools.printAllFims(db_store)643            print(run)644            sys.exit(0)645        if sys.argv[1] == 'list-fims-changed':646            run = tools.printAllFimsChanged(db_store)647            print(run)648            sys.exit(0)649        if sys.argv[1] == 'add-fim':650            name = sys.argv[2]651            _file = sys.argv[3]652            add = tools.addFimFile(name, _file, db_store)653            print(str(add))654            sys.exit(0)655        if sys.argv[1] == 'del-fim':656            name = sys.argv[2]657            _file = sys.argv[3]658            add = tools.delFimFile(name, _file, db_store)659            print(str(add))660            sys.exit(0)661        if sys.argv[1] == 'update-fim':662            name = sys.argv[2]663            data = sys.argv[3]664            try: valid_json = json.loads(data)665            except json.decoder.JSONDecodeError:666                print('invalid json')667                sys.exit(1)668            run = store.replaceINTO('fims', name, data, db_store)669            print(run)670            sys.exit(0)671        if sys.argv[1] == 'list-reports':672            reports = store.selectAll('reports', db_store)673            for row in reports:674                print(row)675            sys.exit(0)676        if sys.argv[1] == 'delete-report':677            name = sys.argv[2]678            delete = store.deleteFrom('reports', name, db_store)679            print(delete)680            sys.exit(0)681        if sys.argv[1] == 'clear-reports':682            clear = store.clearAll('reports', db_store)683            print(clear)684            sys.exit(0)685        if sys.argv[1] == 'update-report':686            name = sys.argv[2]687            data = sys.argv[3]688            try: valid_json = json.loads(data)689            except json.decoder.JSONDecodeError:690                print('invalid json')691                sys.exit(1)692            run = store.replaceINTO('reports', name, data, db_store)693            print(run)694            sys.exit(0)695        if sys.argv[1] == 'list-alerts':696            alerts = store.selectAll('alerts', db_store)697            for row in alerts:698                print(row)699            sys.exit(0)700        if sys.argv[1] == 'delete-alert':701            name = sys.argv[2]702            delete = store.deleteFrom('alerts', name, db_store)703            print(delete)704            sys.exit(0)705        if sys.argv[1] == 'clear-alerts':706            clear = store.clearAll('alerts', db_store)707            print(clear)708            sys.exit(0)709        if sys.argv[1] == 'update-alert':710            name = sys.argv[2]711            data = sys.argv[3]712            try: valid_json = json.loads(data)713            except json.decoder.JSONDecodeError:714                print('invalid json')715                sys.exit(1)716            run = store.replaceINTO('alerts', name, data, db_store)717            print(run)718            sys.exit(0)719        if sys.argv[1] == 'run-alert':720            name = sys.argv[2]721            run = tools.runAlert(name, db_store)722            print(str(run))723            sys.exit(0)724        if sys.argv[1] == 'run-create-db':725            run = store.createDB(db_store)726            print(str(run))727            sys.exit(0)728        if sys.argv[1] == 'run-ps':729            #import modules.ps.ps730            #run = modules.ps.ps.get_ps()731            from .modules.ps import ps732            run = ps.get_ps()733            print(run)734            sys.exit(0)735        if sys.argv[1] == 'list-jobs-available':736            for k,v in tools.options.items():737                print(k)738            sys.exit(0)739        if sys.argv[1] == 'list-counts':740            reports = store.selectAll('counts', db_store)741            for row in reports:742                print(row)743            sys.exit(0)744        if sys.argv[1] == 'clear-counts':745            clear = store.clearAll('counts', db_store)746            print(clear)747            sys.exit(0)748        if sys.argv[1] == 'list-proms':749            _prom = str(db_store) + '.prom'750            with open(_prom, 'r') as _file:751                lines = _file.readlines()752                for line in lines:753                    print(line.strip('\n'))754            sys.exit(0)755        if sys.argv[1] == 'list-proms-db':756            proms = store.selectAll('proms', db_store)757            for row in proms:758                print(row)759            sys.exit(0)760        if sys.argv[1] == 'clear-proms-db':761            clear = store.clearAll('proms', db_store)762            print(clear)763            sys.exit(0)764        if sys.argv[1] == 'update-prom-db':765            name = sys.argv[2]766            data = sys.argv[3]767            run = store.replaceINTOproms(name, data, db_store)768            print(run)769            sys.exit(0)770        if sys.argv[1] == 'file-type':771            #import modules.gitegridy.gitegridy as git772            from .modules.gitegridy import gitegridy as git773            _file = sys.argv[2]774            file_type = git.fileType(_file)775            print(file_type)776            sys.exit(0)777        if sys.argv[1] == 'add-file':778            _file = sys.argv[2]779            store_file = store.storeFile(_file, db_store)780            print(store_file)781            sys.exit(0)782        if sys.argv[1] == 'del-file':783            _file = sys.argv[2]784            unstore_file = store.unstoreFile(_file, db_store)785            print(unstore_file)786            sys.exit(0)787        if sys.argv[1] == 'list-files':788            list_files = store.selectAll('files', db_store)789            for row in list_files:790                print(row[0], row[1])791            sys.exit(0)792        if sys.argv[1] == 'clear-files':793            clear = store.clearAll('files', db_store)794            print(clear)795            sys.exit(0)796        if sys.argv[1] == 'fim-diff':797            _file = sys.argv[2]798            fim_diff = tools.fimDiff(_file, db_store)799            print(fim_diff)800            sys.exit(0)801        if sys.argv[1] == 'fim-restore':802            _file = sys.argv[2]803            try: _dest = sys.argv[3]804            except IndexError: _dest = None805            store_file_ = store.getData('files', _file, db_store)806            store_file_blob = store_file_[0]807            808            if _dest:809                dest = _dest810            else:811                dest = _file812            with open(dest, 'wb+') as outfile:813                outfile.write(store_file_blob)814            print('fim-restore ' + dest)815            sys.exit(0)816        if sys.argv[1] == 'av-scan':817            filedir = sys.argv[2]818            av_scan = tools.avScan(filedir, db_store)819            print(av_scan)820            sys.exit(0)821        if sys.argv[1] == 'tail':822            _file = sys.argv[2]823            for line in tools.tail(_file):824                print(line)825            sys.exit(0)826        if sys.argv[1] == 'logstream':827            for line in tools.logstream():828                print(line)829            sys.exit(0)830        if sys.argv[1] == 'logstream-json':831            for line in tools.logstream():832                print(line.decode('utf-8'))833            sys.exit(0)834        if sys.argv[1] == 'logstream-keys':835            for line in tools.logstream():836                jline = json.loads(line.decode('utf-8'))837                n = len(jline.keys())838                print(n, ' ' , jline.keys())839            sys.exit(0)840        if sys.argv[1] == 'list-b2sums':841            rows = store.selectAll('b2sum', db_store)842            for row in rows:843                print(row)844            sys.exit(0)845        if sys.argv[1] == 'clear-b2sums':846            clear = store.clearAll('b2sum', db_store)847            print(clear)848            sys.exit(0)849        #if sys.argv[1] == 'list-sshwatch':850        #    rows = store.selectAll('sshwatch', db_store)851        #    for row in rows:852        #        print(row)853        #    sys.exit(0)854        #if sys.argv[1] == 'clear-sshwatch':855        #    clear = store.clearAll('sshwatch', db_store)856        #    print(clear)857        #    sys.exit(0)858        if sys.argv[1] == 'clear-training':859            clear = store.clearAll('training', db_store)860            print(clear)861            sys.exit(0)862        if sys.argv[1] == 'clear-model':863            clear = store.clearAll('model', db_store)864            print(clear)865            sys.exit(0)866        if sys.argv[1] == 'list-training':867            try: _id = sys.argv[2]868            except IndexError: _id = None869            if _id:870                if _id  == 'tags':871                    _tag = sys.argv[3]872                    rows = store.getAllTrainingTags(_tag, db_store)873                    for row in rows:874                        print(row)875                else:876                    row = store.getByID('training', _id, db_store)877                    print(row)878            else:879                rows = store.getAll('training', db_store)880                for row in rows:881                    print(row)882            sys.exit(0)883        if sys.argv[1] == 'list-model':884            try: _id = sys.argv[2]885            except IndexError: _id = None886            if _id:887                if _id  == 'tags':888                    _tag = sys.argv[3]889                    #rows = store.getAllTrainingTags(_tag, db_store)890                    rows = store.getAllTableTags(_tag, 'model', db_store)891                    for row in rows:892                        print(row)893                else:894                    row = store.getByID('model', _id, db_store)895                    print(row)896            else:897                rows = store.getAll('model', db_store)898                for row in rows:899                    print(row)900            sys.exit(0)901        if sys.argv[1] == 'update-model':902            tag = sys.argv[2]903            data = sys.argv[3]904            try: valid_json = json.loads(data)905            except json.decoder.JSONDecodeError:906                print('invalid json')907                sys.exit(1)908            #run = store.updateTraining(tag, data, db_store)909            run = store.updateTable(tag, data, 'model', db_store)910            print(run)911            sys.exit(0)912        if sys.argv[1] == 'update-model-tag':913            _id = sys.argv[2]914            tag = sys.argv[3]915            #run = store.updateTrainingTag(_id, tag, db_store)916            run = store.updateTableTag(_id, tag, 'model', db_store)917            print(run)918            sys.exit(0)919        if sys.argv[1] == 'update-training':920            tag = sys.argv[2]921            data = sys.argv[3]922            try: valid_json = json.loads(data)923            except json.decoder.JSONDecodeError:924                print('invalid json')925                sys.exit(1)926            run = store.updateTraining(tag, data, db_store)927            print(run)928            sys.exit(0)929        if sys.argv[1] == 'update-training-tag':930            _id = sys.argv[2]931            tag = sys.argv[3]932            run = store.updateTrainingTag(_id, tag, db_store)933            print(run)934            sys.exit(0)935        if sys.argv[1] == 'delete-training':936            rowid = sys.argv[2]937            delete = store.deleteFromRowid('training', rowid, db_store)938            print(delete)939            sys.exit(0)940        if sys.argv[1] == 'delete-model':941            rowid = sys.argv[2]942            delete = store.deleteFromRowid('model', rowid, db_store)943            print(delete)944            sys.exit(0)945        if sys.argv[1] == 'sample-logstream':946            count = sys.argv[2]947            run = tools.sampleLogStream(count, db_store)948            sys.exit(0)949        if sys.argv[1] == 'mark-training':950            tag = sys.argv[2]951            run = store.markAllTraining(tag, db_store)952            print(run)953            sys.exit(0)954        if sys.argv[1] == 'mark-training-on':955            name = sys.argv[2]956            run = tools.markTrainingRe(name, db_store)957            sys.exit(0)958        #list-occurrence [name|-gt,-lt,-eq num]959        if sys.argv[1] == 'list-occurrence':960            try: opn = sys.argv[2]961            except IndexError: opn=None962            try: val = sys.argv[3]963            except IndexError: val=None964            if val:965                rows = store.getByOp('occurrence', opn, val, db_store)966                for row in rows:967                    print(row)968            elif opn:969                row = store.getByName('occurrence', opn, db_store)970                print(row)971            else:972                rows = store.selectAll('occurrence', db_store)973                for row in rows:974                    print(row)975            sys.exit(0)976        if sys.argv[1] == 'clear-occurrence':977            clear = store.clearAll('occurrence', db_store)978            print(clear)979            sys.exit(0)980        if sys.argv[1] == 'delete-occurrence':981            name = sys.argv[2]982            delete = store.deleteFrom('occurrence', name, db_store)983            print(delete)984            sys.exit(0)985        #if sys.argv[1] == 'copy-occurrence':986        #    name = sys.argv[2]987        #    _copy = store.copyOccurrenceToTraining(name, db_store)988        #    print(_copy)989        #    sys.exit(0)990        if sys.argv[1] == 'copy-occurrence':991            key = sys.argv[2]992            d={}993            from multiprocessing import shared_memory994            l = shared_memory.ShareableList(name='sentinel-shm')995            for i in range(0,len(l),2):996                _key = l[i]997                _val = l[i+1]998                if _key == key:999                    d[_key]=_val1000                    break1001            l.shm.close()1002            l.shm.unlink()1003            if key not in d.keys():1004                print('Not Found: ' + key)1005                sys.exit(0)1006            #_data = d[key]1007            #print(_data)1008            data = tools.promDataParser('data', d[key])1009            #print(data)1010            #print('WORKING.ON')1011            if str(sys.platform).startswith('linux'):1012                j = { 'MESSAGE' : data }1013            elif sys.platform == 'darwin':1014                j = { 'eventMessage' : data }1015            else:1016                j = { 'data' : data }1017            tag=01018            _copy = store.updateTable(tag, json.dumps(j), 'model', db_store)1019            print(_copy)1020            sys.exit(0)1021        if sys.argv[1] == 'list-system-profile-full':1022            rows = store.getAll('system_profile', db_store)1023            for row in rows:1024                print(row)1025            sys.exit(0)1026        if sys.argv[1] == 'list-system-profile':1027            #name = sys.argv[2]1028            #rows = store.selectAll('system_profile', db_store)1029            rows = store.getAll('system_profile', db_store)1030            for row in rows:1031                print(row[0],row[1],row[2])1032            sys.exit(0)1033        if sys.argv[1] == 'gen-system-profile':1034            run = tools.genSystemProfile(db_store)1035            print(run)1036            sys.exit(0)1037        if sys.argv[1] == 'del-system-profile-name':1038            name = sys.argv[2]1039            delete = store.deleteFrom('system_profile', name, db_store)1040            print(delete)1041            sys.exit(0)1042        if sys.argv[1] == 'del-system-profile-rowid':1043            rowid = sys.argv[2]1044            delete = store.deleteFromRowid('system_profile', rowid, db_store)1045            print(delete)1046            sys.exit(0)1047        if sys.argv[1] == 'clear-system-profile':1048            clear = store.clearAll('system_profile', db_store)1049            print(clear)1050            sys.exit(0)1051        if sys.argv[1] == 'get-system-profile-name':1052            name = sys.argv[2]1053            get = store.getByName('system_profile', name, db_store)1054            print(get)1055            sys.exit(0)1056        if sys.argv[1] == 'get-system-profile-rowid':1057            rowid = sys.argv[2]1058            get = store.getByID('system_profile', rowid, db_store)1059            print(get)1060            sys.exit(0)1061        if sys.argv[1] == 'diff-system-profile-rowid':1062            rowid1 = sys.argv[2]1063            rowid2 = sys.argv[3]1064            diff = tools.diffSystemProfileIDs(rowid1, rowid2, db_store)1065            print(diff)1066            sys.exit(0)1067        if sys.argv[1] == 'get-system-profile-data':1068            rowid = sys.argv[2]1069            data = sys.argv[3]1070            get = tools.getSystemProfileData(rowid, data, db_store)1071            print(get)1072            sys.exit(0)1073        #if sys.argv[1] == 'expire-key':1074        #    _key = sys.argv[2]1075        #    expire = tools.setExpiregDictKeyFile(_key, db_store)1076        #    print(expire)1077        #    sys.exit(0)1078        #if sys.argv[1] == 'expire-key':1079        #    _key = sys.argv[2:]1080        #    from multiprocessing import shared_memory1081        #    l = shared_memory.ShareableList([_key], name='sentinel')1082        #    import time1083        #    time.sleep(5)1084        #    l.shm.close()1085        #    l.shm.unlink()1086        #    sys.exit(0)1087        if sys.argv[1] == 'expire-keys':1088            Keys = sys.argv[2:]1089            #for _key in Keys:1090            #    print(_key)1091            from multiprocessing import shared_memory1092            l = shared_memory.ShareableList(Keys, name='sentinel-update')1093            import time1094            time.sleep(5)1095            l.shm.close()1096            l.shm.unlink()1097            sys.exit(0)1098        if sys.argv[1] == 'get-key':1099            key = sys.argv[2]1100            from multiprocessing import shared_memory1101            l = shared_memory.ShareableList(name='sentinel-shm')1102            for i in range(0,len(l),2):1103                _key = l[i]1104                _val = l[i+1]1105                if _key == key:1106                    print(_val.rstrip())1107            l.shm.close()1108            l.shm.unlink()1109            sys.exit(0)1110        if sys.argv[1] == 'list-keys':1111            from multiprocessing import shared_memory1112            l = shared_memory.ShareableList(name='sentinel-shm')1113            #print(l)1114            il = iter(l)1115            for item in il:1116                #print(item, next(il))1117                print(item)1118                next(il)1119            l.shm.close()1120            l.shm.unlink()1121            sys.exit(0)1122        if sys.argv[1] == 'list-vals':1123            from multiprocessing import shared_memory1124            l = shared_memory.ShareableList(name='sentinel-shm')1125            #print(l)1126            #il = iter(l)1127            #for item in il:1128            #    #print(value)1129            #    print(next(il))1130            for i in range(0,len(l),2):1131                key = l[i]1132                val = l[i+1]1133                print(val)1134            l.shm.close()1135            l.shm.unlink()1136            sys.exit(0)1137        if sys.argv[1] == 'list-keys-metric':1138            from multiprocessing import shared_memory1139            l = shared_memory.ShareableList(name='sentinel-shm')1140            #print(l)1141            #il = iter(l)1142            #for item in il:1143            #    #print(item, next(il))1144            #    print(item)1145            #    next(il)1146            #for i in range(0,len(l),2):1147            #    key = l[i]1148            #    val = l[i+1]1149            #    #print(key)1150            #    #print(val.split()[-1])1151            #    print(key, val.split()[-1])1152            #    #print(val.split()[-1])1153            #    #print(val)1154            il = iter(l)1155            for item in il:1156                #print(item, next(il))1157                n = next(il)1158                print(item, n.split()[-1])1159            l.shm.close()1160            l.shm.unlink()1161            sys.exit(0)1162        #if sys.argv[1] == 'get-shm':1163        #    from multiprocessing import shared_memory1164        #    l = shared_memory.ShareableList(name='sentinel-update')1165        #    #print(l)1166        #    for item in l:1167        #        print(item)1168        #    l.shm.close()1169        #    l.shm.unlink()1170        #    sys.exit(0)1171        else:1172            usage()1173            sys.exit(0)1174    else:1175        arpTbl = tools.getArps()1176        update = store.update_arp_data(db_store, arpTbl, db_manuf)1177        print(update)1178        sys.exit(0)1179if __name__ == '__main__':...test_object_store.py
Source:test_object_store.py  
1# test_object_store.py -- tests for object_store.py2# Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>3#4# This program is free software; you can redistribute it and/or5# modify it under the terms of the GNU General Public License6# as published by the Free Software Foundation; version 27# or (at your option) any later version of the License.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License15# along with this program; if not, write to the Free Software16# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,17# MA  02110-1301, USA.18"""Tests for the object store interface."""19from contextlib import closing20from io import BytesIO21import os22import shutil23import tempfile24from dulwich.index import (25    commit_tree,26    )27from dulwich.errors import (28    NotTreeError,29    )30from dulwich.objects import (31    sha_to_hex,32    Blob,33    Tree,34    TreeEntry,35    )36from dulwich.object_store import (37    DiskObjectStore,38    MemoryObjectStore,39    ObjectStoreGraphWalker,40    tree_lookup_path,41    )42from dulwich.pack import (43    REF_DELTA,44    write_pack_objects,45    )46from dulwich.tests import (47    TestCase,48    )49from dulwich.tests.utils import (50    make_object,51    make_tag,52    build_pack,53    )54testobject = make_object(Blob, data=b"yummy data")55class ObjectStoreTests(object):56    def test_determine_wants_all(self):57        self.assertEqual([b"1" * 40],58            self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}))59    def test_determine_wants_all_zero(self):60        self.assertEqual([],61            self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40}))62    def test_iter(self):63        self.assertEqual([], list(self.store))64    def test_get_nonexistant(self):65        self.assertRaises(KeyError, lambda: self.store[b"a" * 40])66    def test_contains_nonexistant(self):67        self.assertFalse((b"a" * 40) in self.store)68    def test_add_objects_empty(self):69        self.store.add_objects([])70    def test_add_commit(self):71        # TODO: Argh, no way to construct Git commit objects without72        # access to a serialized form.73        self.store.add_objects([])74    def test_add_object(self):75        self.store.add_object(testobject)76        self.assertEqual(set([testobject.id]), set(self.store))77        self.assertTrue(testobject.id in self.store)78        r = self.store[testobject.id]79        self.assertEqual(r, testobject)80    def test_add_objects(self):81        data = [(testobject, "mypath")]82        self.store.add_objects(data)83        self.assertEqual(set([testobject.id]), set(self.store))84        self.assertTrue(testobject.id in self.store)85        r = self.store[testobject.id]86        self.assertEqual(r, testobject)87    def test_tree_changes(self):88        blob_a1 = make_object(Blob, data=b'a1')89        blob_a2 = make_object(Blob, data=b'a2')90        blob_b = make_object(Blob, data=b'b')91        for blob in [blob_a1, blob_a2, blob_b]:92            self.store.add_object(blob)93        blobs_1 = [(b'a', blob_a1.id, 0o100644), (b'b', blob_b.id, 0o100644)]94        tree1_id = commit_tree(self.store, blobs_1)95        blobs_2 = [(b'a', blob_a2.id, 0o100644), (b'b', blob_b.id, 0o100644)]96        tree2_id = commit_tree(self.store, blobs_2)97        change_a = ((b'a', b'a'), (0o100644, 0o100644), (blob_a1.id, blob_a2.id))98        self.assertEqual([change_a],99                          list(self.store.tree_changes(tree1_id, tree2_id)))100        self.assertEqual(101            [change_a, ((b'b', b'b'), (0o100644, 0o100644), (blob_b.id, blob_b.id))],102            list(self.store.tree_changes(tree1_id, tree2_id,103                                         want_unchanged=True)))104    def test_iter_tree_contents(self):105        blob_a = make_object(Blob, data=b'a')106        blob_b = make_object(Blob, data=b'b')107        blob_c = make_object(Blob, data=b'c')108        for blob in [blob_a, blob_b, blob_c]:109            self.store.add_object(blob)110        blobs = [111            (b'a', blob_a.id, 0o100644),112            (b'ad/b', blob_b.id, 0o100644),113            (b'ad/bd/c', blob_c.id, 0o100755),114            (b'ad/c', blob_c.id, 0o100644),115            (b'c', blob_c.id, 0o100644),116        ]117        tree_id = commit_tree(self.store, blobs)118        self.assertEqual([TreeEntry(p, m, h) for (p, h, m) in blobs],119                          list(self.store.iter_tree_contents(tree_id)))120    def test_iter_tree_contents_include_trees(self):121        blob_a = make_object(Blob, data=b'a')122        blob_b = make_object(Blob, data=b'b')123        blob_c = make_object(Blob, data=b'c')124        for blob in [blob_a, blob_b, blob_c]:125            self.store.add_object(blob)126        blobs = [127          (b'a', blob_a.id, 0o100644),128          (b'ad/b', blob_b.id, 0o100644),129          (b'ad/bd/c', blob_c.id, 0o100755),130          ]131        tree_id = commit_tree(self.store, blobs)132        tree = self.store[tree_id]133        tree_ad = self.store[tree[b'ad'][1]]134        tree_bd = self.store[tree_ad[b'bd'][1]]135        expected = [136          TreeEntry(b'', 0o040000, tree_id),137          TreeEntry(b'a', 0o100644, blob_a.id),138          TreeEntry(b'ad', 0o040000, tree_ad.id),139          TreeEntry(b'ad/b', 0o100644, blob_b.id),140          TreeEntry(b'ad/bd', 0o040000, tree_bd.id),141          TreeEntry(b'ad/bd/c', 0o100755, blob_c.id),142          ]143        actual = self.store.iter_tree_contents(tree_id, include_trees=True)144        self.assertEqual(expected, list(actual))145    def make_tag(self, name, obj):146        tag = make_tag(obj, name=name)147        self.store.add_object(tag)148        return tag149    def test_peel_sha(self):150        self.store.add_object(testobject)151        tag1 = self.make_tag(b'1', testobject)152        tag2 = self.make_tag(b'2', testobject)153        tag3 = self.make_tag(b'3', testobject)154        for obj in [testobject, tag1, tag2, tag3]:155            self.assertEqual(testobject, self.store.peel_sha(obj.id))156    def test_get_raw(self):157        self.store.add_object(testobject)158        self.assertEqual((Blob.type_num, b'yummy data'),159                         self.store.get_raw(testobject.id))160    def test_close(self):161        # For now, just check that close doesn't barf.162        self.store.add_object(testobject)163        self.store.close()164class MemoryObjectStoreTests(ObjectStoreTests, TestCase):165    def setUp(self):166        TestCase.setUp(self)167        self.store = MemoryObjectStore()168    def test_add_pack(self):169        o = MemoryObjectStore()170        f, commit, abort = o.add_pack()171        try:172            b = make_object(Blob, data=b"more yummy data")173            write_pack_objects(f, [(b, None)])174        except:175            abort()176            raise177        else:178            commit()179    def test_add_pack_emtpy(self):180        o = MemoryObjectStore()181        f, commit, abort = o.add_pack()182        commit()183    def test_add_thin_pack(self):184        o = MemoryObjectStore()185        blob = make_object(Blob, data=b'yummy data')186        o.add_object(blob)187        f = BytesIO()188        entries = build_pack(f, [189            (REF_DELTA, (blob.id, b'more yummy data')),190            ], store=o)191        o.add_thin_pack(f.read, None)192        packed_blob_sha = sha_to_hex(entries[0][3])193        self.assertEqual((Blob.type_num, b'more yummy data'),194                         o.get_raw(packed_blob_sha))195    def test_add_thin_pack_empty(self):196        o = MemoryObjectStore()197        f = BytesIO()198        entries = build_pack(f, [], store=o)199        self.assertEqual([], entries)200        o.add_thin_pack(f.read, None)201class PackBasedObjectStoreTests(ObjectStoreTests):202    def tearDown(self):203        for pack in self.store.packs:204            pack.close()205    def test_empty_packs(self):206        self.assertEqual([], list(self.store.packs))207    def test_pack_loose_objects(self):208        b1 = make_object(Blob, data=b"yummy data")209        self.store.add_object(b1)210        b2 = make_object(Blob, data=b"more yummy data")211        self.store.add_object(b2)212        self.assertEqual([], list(self.store.packs))213        self.assertEqual(2, self.store.pack_loose_objects())214        self.assertNotEqual([], list(self.store.packs))215        self.assertEqual(0, self.store.pack_loose_objects())216class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase):217    def setUp(self):218        TestCase.setUp(self)219        self.store_dir = tempfile.mkdtemp()220        self.addCleanup(shutil.rmtree, self.store_dir)221        self.store = DiskObjectStore.init(self.store_dir)222    def tearDown(self):223        TestCase.tearDown(self)224        PackBasedObjectStoreTests.tearDown(self)225    def test_alternates(self):226        alternate_dir = tempfile.mkdtemp()227        self.addCleanup(shutil.rmtree, alternate_dir)228        alternate_store = DiskObjectStore(alternate_dir)229        b2 = make_object(Blob, data=b"yummy data")230        alternate_store.add_object(b2)231        store = DiskObjectStore(self.store_dir)232        self.assertRaises(KeyError, store.__getitem__, b2.id)233        store.add_alternate_path(alternate_dir)234        self.assertIn(b2.id, store)235        self.assertEqual(b2, store[b2.id])236    def test_add_alternate_path(self):237        store = DiskObjectStore(self.store_dir)238        self.assertEqual([], list(store._read_alternate_paths()))239        store.add_alternate_path("/foo/path")240        self.assertEqual(["/foo/path"], list(store._read_alternate_paths()))241        store.add_alternate_path("/bar/path")242        self.assertEqual(243            ["/foo/path", "/bar/path"],244            list(store._read_alternate_paths()))245    def test_rel_alternative_path(self):246        alternate_dir = tempfile.mkdtemp()247        self.addCleanup(shutil.rmtree, alternate_dir)248        alternate_store = DiskObjectStore(alternate_dir)249        b2 = make_object(Blob, data=b"yummy data")250        alternate_store.add_object(b2)251        store = DiskObjectStore(self.store_dir)252        self.assertRaises(KeyError, store.__getitem__, b2.id)253        store.add_alternate_path(os.path.relpath(alternate_dir, self.store_dir))254        self.assertEqual(list(alternate_store), list(store.alternates[0]))255        self.assertIn(b2.id, store)256        self.assertEqual(b2, store[b2.id])257    def test_pack_dir(self):258        o = DiskObjectStore(self.store_dir)259        self.assertEqual(os.path.join(self.store_dir, "pack"), o.pack_dir)260    def test_add_pack(self):261        o = DiskObjectStore(self.store_dir)262        f, commit, abort = o.add_pack()263        try:264            b = make_object(Blob, data=b"more yummy data")265            write_pack_objects(f, [(b, None)])266        except:267            abort()268            raise269        else:270            commit()271    def test_add_thin_pack(self):272        o = DiskObjectStore(self.store_dir)273        try:274            blob = make_object(Blob, data=b'yummy data')275            o.add_object(blob)276            f = BytesIO()277            entries = build_pack(f, [278              (REF_DELTA, (blob.id, b'more yummy data')),279              ], store=o)280            with o.add_thin_pack(f.read, None) as pack:281                packed_blob_sha = sha_to_hex(entries[0][3])282                pack.check_length_and_checksum()283                self.assertEqual(sorted([blob.id, packed_blob_sha]), list(pack))284                self.assertTrue(o.contains_packed(packed_blob_sha))285                self.assertTrue(o.contains_packed(blob.id))286                self.assertEqual((Blob.type_num, b'more yummy data'),287                                 o.get_raw(packed_blob_sha))288        finally:289            o.close()290    def test_add_thin_pack_empty(self):291        with closing(DiskObjectStore(self.store_dir)) as o:292            f = BytesIO()293            entries = build_pack(f, [], store=o)294            self.assertEqual([], entries)295            o.add_thin_pack(f.read, None)296class TreeLookupPathTests(TestCase):297    def setUp(self):298        TestCase.setUp(self)299        self.store = MemoryObjectStore()300        blob_a = make_object(Blob, data=b'a')301        blob_b = make_object(Blob, data=b'b')302        blob_c = make_object(Blob, data=b'c')303        for blob in [blob_a, blob_b, blob_c]:304            self.store.add_object(blob)305        blobs = [306          (b'a', blob_a.id, 0o100644),307          (b'ad/b', blob_b.id, 0o100644),308          (b'ad/bd/c', blob_c.id, 0o100755),309          (b'ad/c', blob_c.id, 0o100644),310          (b'c', blob_c.id, 0o100644),311          ]312        self.tree_id = commit_tree(self.store, blobs)313    def get_object(self, sha):314        return self.store[sha]315    def test_lookup_blob(self):316        o_id = tree_lookup_path(self.get_object, self.tree_id, b'a')[1]317        self.assertTrue(isinstance(self.store[o_id], Blob))318    def test_lookup_tree(self):319        o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad')[1]320        self.assertTrue(isinstance(self.store[o_id], Tree))321        o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad/bd')[1]322        self.assertTrue(isinstance(self.store[o_id], Tree))323        o_id = tree_lookup_path(self.get_object, self.tree_id, b'ad/bd/')[1]324        self.assertTrue(isinstance(self.store[o_id], Tree))325    def test_lookup_nonexistent(self):326        self.assertRaises(KeyError, tree_lookup_path, self.get_object, self.tree_id, b'j')327    def test_lookup_not_tree(self):328        self.assertRaises(NotTreeError, tree_lookup_path, self.get_object, self.tree_id, b'ad/b/j')329class ObjectStoreGraphWalkerTests(TestCase):330    def get_walker(self, heads, parent_map):331        new_parent_map = dict([332            (k * 40, [(p * 40) for p in ps]) for (k, ps) in parent_map.items()])333        return ObjectStoreGraphWalker([x * 40 for x in heads],334            new_parent_map.__getitem__)335    def test_ack_invalid_value(self):336        gw = self.get_walker([], {})337        self.assertRaises(ValueError, gw.ack, "tooshort")338    def test_empty(self):339        gw = self.get_walker([], {})340        self.assertIs(None, next(gw))341        gw.ack(b"a" * 40)342        self.assertIs(None, next(gw))343    def test_descends(self):344        gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})345        self.assertEqual(b"a" * 40, next(gw))346        self.assertEqual(b"b" * 40, next(gw))347    def test_present(self):348        gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})349        gw.ack(b"a" * 40)350        self.assertIs(None, next(gw))351    def test_parent_present(self):352        gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})353        self.assertEqual(b"a" * 40, next(gw))354        gw.ack(b"a" * 40)355        self.assertIs(None, next(gw))356    def test_child_ack_later(self):357        gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": [b"c"], b"c": []})358        self.assertEqual(b"a" * 40, next(gw))359        self.assertEqual(b"b" * 40, next(gw))360        gw.ack(b"a" * 40)361        self.assertIs(None, next(gw))362    def test_only_once(self):363        # a  b364        # |  |365        # c  d366        # \ /367        #  e368        gw = self.get_walker([b"a", b"b"], {369                b"a": [b"c"],370                b"b": [b"d"],371                b"c": [b"e"],372                b"d": [b"e"],373                b"e": [],374                })375        walk = []376        acked = False377        walk.append(next(gw))378        walk.append(next(gw))379        # A branch (a, c) or (b, d) may be done after 2 steps or 3 depending on380        # the order walked: 3-step walks include (a, b, c) and (b, a, d), etc.381        if walk == [b"a" * 40, b"c" * 40] or walk == [b"b" * 40, b"d" * 40]:382          gw.ack(walk[0])383          acked = True384        walk.append(next(gw))385        if not acked and walk[2] == b"c" * 40:386          gw.ack(b"a" * 40)387        elif not acked and walk[2] == b"d" * 40:388          gw.ack(b"b" * 40)389        walk.append(next(gw))390        self.assertIs(None, next(gw))391        self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40], sorted(walk))392        self.assertLess(walk.index(b"a" * 40), walk.index(b"c" * 40))...gitegridy.py
Source:gitegridy.py  
1#!/usr/bin/env python32import os3from subprocess import Popen, PIPE4import sys5def gitStoreLink(git_store, List, verbose=False):6    if not os.path.isdir(git_store):7        if verbose: print('mkdir ' + str(git_store))8        os.mkdir(git_store, 0o755)9    for f in List:10        gfile = git_store + f11        if not os.path.isdir(os.path.dirname(gfile)):12            if verbose: print('mkdir ' + str(os.path.dirname(gfile)))13            os.mkdir(os.path.dirname(gfile), 0o755)14        if not os.path.isfile(gfile):15            if verbose: print('link ' + str(gfile))16            os.link(f, gfile)17    return True18def gitStoreInit(git_store, verbose=False):19    if not os.path.isdir(git_store):20        if verbose: print('mkdir ' + str(git_store))21        os.mkdir(git_store, 0o755)22    if not os.path.isdir(git_store + '/.git'):23        if verbose: print('git init ' + str(git_store))24        cmd = 'git init ' + str(git_store)25        proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)26        if verbose:27            for line in proc.stdout.readlines():28                print(line.decode('utf-8').strip('\n'))29    #os.chdir(git_store)30    return True31def gitStoreAdd(git_store, f, verbose=False):32    try:33        os.chdir(git_store)34    except FileNotFoundError as e:35        if verbose: print('FileNotFoundError: ' + str(e))36        return 'FileNotFoundError: ' + str(e)37    if not os.access(f, os.F_OK):38        if verbose: print('Not Found: ' + str(f))39        return 'Not Found: ' + str(f)40    elif not os.access(f, os.R_OK):41        if verbose: print('No Access: ' + str(f))42        return 'No Access: ' + str(f)43    cmd = 'git add ' + git_store + f44    if verbose: print('git add ' + git_store + f)45    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)46    stdout, stderr = proc.communicate()47    exit_code = proc.wait()48    if verbose:49        print(stdout.decode('utf-8'))50        print(stderr.decode('utf-8'))51        print(str(exit_code))52    return stdout, stderr, exit_code53def gitStoreDel(git_store, f, verbose=False):54    os.chdir(git_store)55    if verbose: print('git rm ' + git_store + f)56    cmd = 'git rm -f ' + git_store + f57    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)58    if verbose:59        for line in proc.stdout.readlines():60            print(line.decode('utf-8').strip('\n'))61    if os.path.exists(git_store + f):62        if verbose: print('remove ' + git_store + f)63        os.remove(git_store + f)64    git_commit = gitStoreCommit(git_store, f, verbose=True)65    return True66    67def gitStoreCommit(git_store, f, verbose=False):68    os.chdir(git_store)69    if verbose: print('git commit me ' + git_store + f)70    #import shlex71    #shlex.split(cmd)72    #cmd = 'git commit -m "sentinel" ' + git_store + f73    #proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)74    cmd = ['git', 'commit', '-m', '"sentinel ' + str(f) + '"', git_store + f ]75    proc = Popen(cmd, stdout=PIPE, stderr=PIPE)76    stdout, stderr = proc.communicate()77    exit_code = proc.wait()78    if verbose:79        print(stdout.decode('utf-8'))80        print(stderr.decode('utf-8'))81        print(str(exit_code))82    return stdout, stderr, exit_code83def gitStoreStatus(git_store, verbose=False):84    try:85        os.chdir(git_store)86    except FileNotFoundError as e:87        if verbose: print('FileNotFoundError: ' + str(e))88        return 'FileNotFoundError: ' + str(e)89    cmd = 'git status'90    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)91    stdout, stderr = proc.communicate()92    exit_code = proc.wait()93    if verbose:94        print(stdout.decode('utf-8'))95        print(stderr.decode('utf-8'))96        print(str(exit_code))97    return stdout, stderr, exit_code98def gitStoreLsFiles(git_store, verbose=False):99    try:100        os.chdir(git_store)101    except FileNotFoundError as e:102        if verbose: print('FileNotFoundError: ' + str(e))103        return 'FileNotFoundError: ' + str(e)104    cmd = 'git ls-files'105    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)106    stdout, stderr = proc.communicate()107    exit_code = proc.wait()108    if verbose:109        print(stdout.decode('utf-8'))110        print(stderr.decode('utf-8'))111        print(str(exit_code))112    return stdout, stderr, exit_code113def gitStoreLog(git_store, verbose=False):114    try:115        os.chdir(git_store)116    except FileNotFoundError as e:117        if verbose: print('FileNotFoundError: ' + str(e))118        return 'FileNotFoundError: ' + str(e)119    cmd = 'git log'120    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)121    if verbose:122        for line in proc.stdout.readlines():123            print(line.decode('utf-8').strip('\n'))124    return proc.stdout.readlines()125def gitStoreClearHistory(git_store, verbose=False):126    os.chdir(git_store)127    cmd = 'git checkout --orphan temp_branch'128    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)129    if verbose:130        for line in proc.stdout.readlines():131            print(line.decode('utf-8').strip('\n'))132    cmd = 'git add -A'133    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)134    if verbose:135        for line in proc.stdout.readlines():136            print(line.decode('utf-8').strip('\n'))137    cmd = ['git','commit','-am "sentinel re-commit"']138    proc = Popen(cmd, stdout=PIPE, stderr=PIPE)139    if verbose:140        for line in proc.stdout.readlines():141            print(line.decode('utf-8').strip('\n'))142    cmd = 'git branch -D master'143    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)144    if verbose:145        for line in proc.stdout.readlines():146            print(line.decode('utf-8').strip('\n'))147    cmd = 'git branch -m master'148    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)149    if verbose:150        for line in proc.stdout.readlines():151            print(line.decode('utf-8').strip('\n'))152    #cmd = 'git push -f origin master'153    #proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)154    #if verbose:155    #    for line in proc.stdout.readlines():156    #        print(line.decode('utf-8').strip('\n'))157    return True158#import mimetypes159#mime = mimetypes.guess_type(file)160def fileType(_file):161    try:162        with open(_file, 'r', encoding='utf-8') as f:163            f.read(4)164            return 'text'165    except UnicodeDecodeError:166        return 'binary'167def gitStoreDiff(git_store, f=None, verbose=False):168    try:169        os.chdir(git_store)170    except FileNotFoundError as e:171        if verbose: print('FileNotFoundError: ' + str(e))172        return 'FileNotFoundError: ' + str(e)173    if f is None:174        f = ''175    cmd = 'git diff ' + f176    proc = Popen(cmd.split(), stdout=PIPE, stderr=PIPE)177    stdout, stderr = proc.communicate()178    exit_code = proc.wait()179    if verbose:180        print(stdout.decode('utf-8'))181        print(stderr.decode('utf-8'))182        print(str(exit_code))183    return stdout, stderr, exit_code184if __name__ == '__main__':185    git_store = '/opt/sentinel/db/git/dir2'186    L = [ '/etc/hosts', '/etc/ssh/sshd_config' ]187    git_init = gitStoreInit(git_store)188    git_link = gitStoreLink(git_store, L)189    #for f in L:190    #    git_add  = gitStoreAdd(git_store, f)191    #    git_commit = gitStoreCommit(git_store, f)192    if sys.argv[1:]:193        if sys.argv[1] == 'git-status':194            git_status = gitStoreStatus(git_store, verbose=True)195        if sys.argv[1] == 'git-files':196            git_files = gitStoreLsFiles(git_store, verbose=True)197        if sys.argv[1] == 'git-log':198            git_log = gitStoreLog(git_store, verbose=True)199        if sys.argv[1] == 'git-add':200            _file = sys.argv[2]201            if not os.access(_file, os.F_OK):202                print('Not Found: ' + str(_file))203                sys.exit(1)204            elif not os.access(_file, os.R_OK):205                print('No Access: ' + str(_file))206                sys.exit(1)207            git_link = gitStoreLink(git_store, [_file], verbose=True)208            git_add  = gitStoreAdd(git_store, _file, verbose=True)209            git_commit = gitStoreCommit(git_store, _file, verbose=True)210        if sys.argv[1] == 'git-del':211            _file = sys.argv[2]212            git_del = gitStoreDel(git_store, _file, verbose=True)213        if sys.argv[1] == 'git-commit':214            _file = sys.argv[2]215            if not os.access(_file, os.F_OK):216                print('Not Found: ' + str(_file))217                sys.exit(1)218            elif not os.access(_file, os.R_OK):219                print('No Access: ' + str(_file))220                sys.exit(1)221            git_commit = gitStoreCommit(git_store, _file, verbose=True)222        if sys.argv[1] == 'git-clear-history':223            git_clear_hist = gitStoreClearHistory(git_store, verbose=True)224        if sys.argv[1] == 'git-diff':225            try: _file = sys.argv[2]226            except IndexError: _file = None227            git_diff = gitStoreDiff(git_store, _file, verbose=True)228        if sys.argv[1] == 'git-init':229            git_init = gitStoreInit(git_store)230        if sys.argv[1] == 'file-type':231            _file = sys.argv[2]232            file_type = fileType(_file)233            print(file_type)...test_modulestore_settings.py
Source:test_modulestore_settings.py  
...149        new_mixed_setting, new_default_store_setting = self.assertMigrated(old_setting)150        self.assertStoreValuesEqual(new_default_store_setting, old_setting["default"])151        self.assertEqual(new_default_store_setting["ENGINE"], old_setting["default"]["ENGINE"])152        self.assertFalse(self.is_split_configured(new_mixed_setting))153    def test_convert_from_old_mongo_to_draft_store(self):154        old_setting = self.OLD_CONFIG_WITH_DIRECT_MONGO155        new_mixed_setting, new_default_store_setting = self.assertMigrated(old_setting)156        self.assertStoreValuesEqual(new_default_store_setting, old_setting["default"])157        self.assertEqual(new_default_store_setting["ENGINE"], "xmodule.modulestore.mongo.draft.DraftModuleStore")158        self.assertTrue(self.is_split_configured(new_mixed_setting))159    def test_convert_from_dict_to_list(self):160        old_mixed_setting = self.OLD_MIXED_CONFIG_WITH_DICT161        new_mixed_setting, new_default_store_setting = self.assertMigrated(old_mixed_setting)162        self.assertEqual(new_default_store_setting["ENGINE"], "the_default_store")163        self.assertTrue(self.is_split_configured(new_mixed_setting))164        # exclude split when comparing old and new, since split was added as part of the migration165        new_stores = [store for store in get_mixed_stores(new_mixed_setting) if store['NAME'] != 'split']166        old_stores = get_mixed_stores(self.OLD_MIXED_CONFIG_WITH_DICT)167        # compare each store configured in mixed...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
