Best Python code snippet using autotest_python
cpuset.py
Source:cpuset.py  
...80    # current version assumes shared cgroup hierarchy81    return os.path.join(super_root_path, container_name, 'io.'+attr)82def tasks_path(container_name):83    return os.path.join(full_path(container_name), 'tasks')84def mems_path(container_name):85    return cpuset_attr(container_name, 'mems')86def memory_path(container_name):87    return os.path.join(super_root_path, container_name, 'memory')88def cpus_path(container_name):89    return cpuset_attr(container_name, 'cpus')90def container_exists(name):91    return name is not None and os.path.exists(tasks_path(name))92def move_tasks_into_container(name, tasks):93    task_file = tasks_path(name)94    for task in tasks:95        try:96            logging.debug('moving task %s into container "%s"', task, name)97            utils.write_one_line(task_file, task)98        except Exception:99            if utils.pid_is_alive(task):100                raise   # task exists but couldn't move it101            # task is gone or zombie so ignore this exception102def move_self_into_container(name):103    me = str(os.getpid())104    move_tasks_into_container(name, [me])105    logging.debug('running self (pid %s) in container "%s"', me, name)106def _avail_mbytes_via_nodes(parent):107    # total mbytes of mem nodes available for new containers in parent108    free_nodes = available_exclusive_mem_nodes(parent)109    mbytes = nodes_avail_mbytes(free_nodes)110    # don't have exact model for how container mgr measures mem space111    # better here to underestimate than overestimate112    mbytes = max(mbytes - node_mbytes//2, 0)113    return mbytes114def _avail_bytes_via_pages(parent):115    # Get memory bytes available to parent container which could116    #  be allocated exclusively to new child containers.117    # This excludes mem previously allocated to existing children.118    available = container_bytes(parent)119    mem_files_pattern = os.path.join(full_path(parent),120                                     '*', 'memory.limit_in_bytes')121    for mem_file in glob.glob(mem_files_pattern):122        child_container = unpath(os.path.dirname(mem_file))123        available -= container_bytes(child_container)124    return available125def avail_mbytes(parent=SUPER_ROOT):126    # total mbytes available in parent, for exclusive use in new containers127    if fake_numa_containers:128        return _avail_mbytes_via_nodes(parent)129    else:130        return _avail_bytes_via_pages(parent) >> 20131def delete_leftover_test_containers():132    # recover mems and cores tied up by containers of prior failed tests:133    for child in inner_containers_of(SUPER_ROOT):134        _release_container_nest(child)135def my_lock(lockname):136    # lockname is 'inner'137    lockdir = os.environ['AUTODIR']138    lockname = os.path.join(lockdir, '.cpuset.lock.'+lockname)139    lockfile = open(lockname, 'w')140    fcntl.flock(lockfile, fcntl.LOCK_EX)141    return lockfile142def my_unlock(lockfile):143    fcntl.flock(lockfile, fcntl.LOCK_UN)144    lockfile.close()145# Convert '1-3,7,9-12' to set(1,2,3,7,9,10,11,12)146def rangelist_to_set(rangelist):147    result = set()148    if not rangelist:149        return result150    for x in rangelist.split(','):151        if re.match(r'^(\d+)$', x):152            result.add(int(x))153            continue154        m = re.match(r'^(\d+)-(\d+)$', x)155        if m:156            start = int(m.group(1))157            end = int(m.group(2))158            result.update(set(range(start, end+1)))159            continue160        msg = 'Cannot understand data input: %s %s' % (x, rangelist)161        raise ValueError(msg)162    return result163def my_container_name():164    # Get current process's inherited or self-built container name165    #   within /dev/cpuset or /dev/cgroup.  Is '' for root container.166    name = utils.read_one_line('/proc/%i/cpuset' % os.getpid())167    return name[1:]   # strip leading /168def get_mem_nodes(container_name):169    # all mem nodes now available to a container, both exclusive & shared170    file_name = mems_path(container_name)171    if os.path.exists(file_name):172        return rangelist_to_set(utils.read_one_line(file_name))173    else:174        return set()175def _busy_mem_nodes(parent_container):176    # Get set of numa memory nodes now used (exclusively or shared)177    #   by existing children of parent container178    busy = set()179    mem_files_pattern = os.path.join(full_path(parent_container),180                                     '*', cpuset_prefix+'mems')181    for mem_file in glob.glob(mem_files_pattern):182        child_container = os.path.dirname(mem_file)183        busy |= get_mem_nodes(child_container)184    return busy185def available_exclusive_mem_nodes(parent_container):186    # Get subset of numa memory nodes of parent container which could187    #  be allocated exclusively to new child containers.188    # This excludes nodes now allocated to existing children.189    need_fake_numa()190    available = get_mem_nodes(parent_container)191    available -= _busy_mem_nodes(parent_container)192    return available193def my_mem_nodes():194    # Get set of numa memory nodes owned by current process's container.195    discover_container_style()196    if not mem_isolation_on:197        return set()    # as expected by vmstress198    return get_mem_nodes(my_container_name())199def my_available_exclusive_mem_nodes():200    # Get subset of numa memory nodes owned by current process's201    # container, which could be allocated exclusively to new child202    # containers.  This excludes any nodes now allocated203    # to existing children.204    return available_exclusive_mem_nodes(my_container_name())205def node_avail_kbytes(node):206    return node_mbytes << 10  # crude; fixed numa node size207def nodes_avail_mbytes(nodes):208    # nodes' combined user+avail size, in Mbytes209    return sum(node_avail_kbytes(n) for n in nodes) // 1024210def container_bytes(name):211    if fake_numa_containers:212        return nodes_avail_mbytes(get_mem_nodes(name)) << 20213    else:214        while True:215            file = memory_path(name) + '.limit_in_bytes'216            limit = int(utils.read_one_line(file))217            if limit < NO_LIMIT:218                return limit219            if name == SUPER_ROOT:220                return root_container_bytes221            name = os.path.dirname(name)222def container_mbytes(name):223    return container_bytes(name) >> 20224def mbytes_per_mem_node():225    # Get mbyte size of standard numa mem node, as float226    #  (some nodes are bigger than this)227    # Replaces utils.node_size().228    numa = get_boot_numa()229    if numa.endswith('M'):230        return float(numa[:-1])  # mbyte size of fake nodes231    elif numa:232        nodecnt = int(numa)  # fake numa mem nodes for container isolation233    else:234        nodecnt = len(utils.numa_nodes())  # phys mem-controller nodes235    # Use guessed total physical mem size, not kernel's236    #   lesser 'available memory' after various system tables.237    return utils.rounded_memtotal() / (nodecnt * 1024.0)238def get_cpus(container_name):239    file_name = cpus_path(container_name)240    if os.path.exists(file_name):241        return rangelist_to_set(utils.read_one_line(file_name))242    else:243        return set()244def get_tasks(container_name):245    file_name = tasks_path(container_name)246    try:247        tasks = [x.rstrip() for x in open(file_name).readlines()]248    except IOError:249        if os.path.exists(file_name):250            raise251        tasks = []   # container doesn't exist anymore252    return tasks253def inner_containers_of(parent):254    pattern = os.path.join(full_path(parent), '*/tasks')255    return [unpath(os.path.dirname(task_file))256            for task_file in glob.glob(pattern)]257def _release_container_nest(nest):258    # Destroy a container, and any nested sub-containers259    nest_path = full_path(nest)260    if os.path.exists(nest_path):261        # bottom-up walk of tree, releasing all nested sub-containers262        for child in inner_containers_of(nest):263            _release_container_nest(child)264        logging.debug("releasing container %s", nest)265        # Transfer any survivor tasks (e.g. self) to parent container266        parent = os.path.dirname(nest)267        move_tasks_into_container(parent, get_tasks(nest))268        # remove the now-empty outermost container of this nest269        if os.path.exists(nest_path):270            os.rmdir(nest_path)  # nested, or dead manager271def release_container(container_name=None):272    # Destroy a container273    my_container = my_container_name()274    if container_name is None:275        container_name = my_container276    _release_container_nest(container_name)277    displaced = my_container_name()278    if displaced != my_container:279        logging.debug('now running self (pid %d) in container "%s"',280                      os.getpid(), displaced)281def remove_empty_prio_classes(prios):282    # remove prio classes whose set of allowed priorities is empty283    #    e.g  'no:3;rt:;be:3;id:'  -->  'no:3;be:3'284    return ';'.join(p for p in prios.split(';') if p.split(':')[1])285def all_drive_names():286    # list of all disk drives sda,sdb,...287    paths = glob.glob('/sys/block/sd*')288    if not paths:289        paths = glob.glob('/sys/block/hd*')290    return [os.path.basename(path) for path in paths]291def set_io_controls(container_name, disks=[], ioprio_classes=[PROPIO_NORMAL],292                    io_shares=[95], io_limits=[0]):293    # set the propio controls for one container, for selected disks294    # writing directly to /dev/cgroup/container_name/io.io_service_level295    #    without using containerd or container.py296    # See wiki ProportionalIOScheduler for definitions297    # ioprio_classes: list of service classes, one per disk298    #    using numeric propio service classes as used by kernel API, namely299    #       1: RT, Real Time, aka PROPIO_PRIO300    #       2: BE, Best Effort, aka PROPIO_NORMAL301    #       3: PROPIO_IDLE302    # io_shares: list of disk-time-fractions, one per disk,303    #       as percentage integer 0..100304    # io_limits: list of limit on/off, one per disk305    #       0: no limit, shares use of other containers' unused disk time306    #       1: limited, container's use of disk time is capped to given DTF307    # ioprio_classes defaults to best-effort308    # io_limit defaults to no limit, use slack time309    if not disks:  # defaults to all drives310        disks = all_drive_names()311        io_shares      = [io_shares     [0]] * len(disks)312        ioprio_classes = [ioprio_classes[0]] * len(disks)313        io_limits      = [io_limits     [0]] * len(disks)314    if not (len(disks) == len(ioprio_classes) and len(disks) == len(io_shares)315                                              and len(disks) == len(io_limits)):316        raise error.AutotestError('Unequal number of values for io controls')317    service_level = io_attr(container_name, 'io_service_level')318    if not os.path.exists(service_level):319        return  # kernel predates propio features320            # or io cgroup is mounted separately from cpusets321    disk_infos = []322    for disk,ioclass,limit,share in zip(disks, ioprio_classes,323                                        io_limits, io_shares):324        parts = (disk, str(ioclass), str(limit), str(share))325        disk_info = ' '.join(parts)326        utils.write_one_line(service_level, disk_info)327        disk_infos.append(disk_info)328    logging.debug('set_io_controls of %s to %s',329                  container_name, ', '.join(disk_infos))330def abbrev_list(vals):331    """Condense unsigned (0,4,5,6,7,10) to '0,4-7,10'."""332    ranges = []333    lower = 0334    upper = -2335    for val in sorted(vals)+[-1]:336        if val != upper+1:337            if lower == upper:338                ranges.append(str(lower))339            elif lower <= upper:340                ranges.append('%d-%d' % (lower, upper))341            lower = val342        upper = val343    return ','.join(ranges)344def create_container_with_specific_mems_cpus(name, mems, cpus):345    need_fake_numa()346    os.mkdir(full_path(name))347    utils.write_one_line(cpuset_attr(name, 'mem_hardwall'), '1')348    utils.write_one_line(mems_path(name), ','.join(map(str, mems)))349    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))350    logging.debug('container %s has %d cpus and %d nodes totalling %s bytes',351                  name, len(cpus), len(get_mem_nodes(name)),352                  utils.human_format(container_bytes(name)) )353def create_container_via_memcg(name, parent, bytes, cpus):354    # create container via direct memcg cgroup writes355    os.mkdir(full_path(name))356    nodes = utils.read_one_line(mems_path(parent))357    utils.write_one_line(mems_path(name), nodes)  # inherit parent's nodes358    utils.write_one_line(memory_path(name)+'.limit_in_bytes', str(bytes))359    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))360    logging.debug('Created container %s directly via memcg,'361                  ' has %d cpus and %s bytes',362                  name, len(cpus), utils.human_format(container_bytes(name)))363def _create_fake_numa_container_directly(name, parent, mbytes, cpus):364    need_fake_numa()365    lockfile = my_lock('inner')   # serialize race between parallel tests366    try:367        # Pick specific mem nodes for new cpuset's exclusive use368        # For now, arbitrarily pick highest available node numbers369        needed_kbytes = mbytes * 1024370        nodes = sorted(list(available_exclusive_mem_nodes(parent)))371        kbytes = 0...mem_wrapper.py
Source:mem_wrapper.py  
1import os2import subprocess3from sys import stdout, exit4from collections import defaultdict5from collections import namedtuple6mem = namedtuple('Mem', ['x', 'y', 'c', 'd', 'val', 'j', "exon_part_id"])7globals()[mem.__name__] = mem # Global needed for multiprocessing8def find_mems_mummer(mummer_path, outfolder, read_path, refs_path, mummer_out_path, min_mem):9    with open(mummer_out_path, "w") as output_file:10        # print('Running spoa...', end=' ')11        stdout.flush()12        null = open(os.path.join(outfolder, "mummer_errors.1") , "w")13        subprocess.check_call([ mummer_path,   '-maxmatch', '-l' , str(min_mem),  refs_path, read_path], stdout=output_file, stderr=null)14        # print('Done.')15        stdout.flush()16    output_file.close()17def find_mems_slamem(slamem_path, mummer_path, outfolder, read_path, refs_path, out_path, min_mem):18    # time slaMEM -l 14 /Users/kxs624/tmp/ULTRA/human_test/refs_sequences.fa /Users/kxs624/tmp/ULTRA/human_test_new_flanking_strat/reads_tmp.fq -o /Users/kxs624/tmp/ULTRA/human_test/slamem_test.tx19    # with open(out_path, "w") as output_file:20    tmp = out_path.split("seeds_batch_")[1]21    batch_id =  tmp.split('.')[0]22    stdout.flush()23    stderr_file = open(os.path.join(outfolder, "slamem_stderr_{0}.1".format(batch_id)) , "w")24    stdout_file = open(os.path.join(outfolder, "slamem_stdout_{0}.1".format(batch_id)) , "w")25    try: # slaMEM throws error if no MEMs are found in any of the sequences26        subprocess.check_call([ slamem_path, '-l' , str(min_mem),  refs_path, read_path, '-o', out_path ], stdout=stdout_file, stderr=stderr_file, env = os.environ)27        print("Using SLAMEM")28    except:29        find_mems_mummer(mummer_path, outfolder, read_path, refs_path, out_path, min_mem)30        print("Using MUMMER")31    print('Done.')32    stdout.flush()33    # output_file.close()34def find_nams_strobemap(slamem_path, outfolder, read_path, refs_path, out_path, nr_cores, min_mem):35    # /usr/bin/time -l ./StrobeMap -n 2 -k 9 -w 30 -t 3 -s  -o /Users/kxs624/tmp/ULTRA/human_test/refs_sequences.fa /Users/kxs624/tmp/ULTRA/human_test_new_flanking_strat/reads_tmp.fq36    # StrobeMap -n 2 -k 9 -w 30 -t 3 -s  -o ~/tmp/STROBEMERS/multithreading/ /Users/kxs624/tmp/ULTRA/dros_tmp/refs_sequences.fa /Users/kxs624/tmp/ULTRA/dros_test/reads_16xrep.fa37    # with open(out_path, "w") as output_file:38    stdout.flush()39    stderr_file = open(os.path.join(outfolder, "strobemap_stderr.1") , "w")40    stdout_file = open(os.path.join(outfolder, "strobemap_stdout.1") , "w")41    try: # slaMEM throws error if no MEMs are found in any of the sequences42        subprocess.check_call([ 'StrobeMap', '-n' , "2", '-k' , '10', '-v' , "11", '-w' , "35", '-C' , '500', '-L' , '1000', '-S', '-t', str(nr_cores), '-s', '-o', outfolder, refs_path, read_path ], stdout=stdout_file, stderr=stderr_file)43        print("Using StrobeMap")44        print([ 'StrobeMap', '-n' , "2", '-k' , '10', '-v' , "11", '-w' , "35", '-C' , '500', '-L' , '1000', '-S', '-t', str(nr_cores), '-s', '-o', outfolder, refs_path, read_path ])45    except:46        find_mems_slamem(slamem_path, outfolder, read_path, refs_path, out_path, min_mem)47        print("An unexpected error happend in StrobeMap, check error log at:", stderr_file)48        print("If you beileive this is a bug in StrobeMap, report an issue at: https://github.com/ksahlin/strobemers")49        print("You can always sidestep this issue by providing another seed finder to uLTRA, i.e., remove option --use_NAM_seeds.")50        sys.exit()51    stdout.flush()52# def parse_results(mems_path):53#     # file = open(os.path.join(mems_folder, "mummer_mems.txt"), 'r')54#     mems_db = {}55#     # read_mems_tmp = {}56#     for i, line in enumerate(open(mems_path, 'r')):57#         if line[0] == '>':58#             if i == 0:59#                 read_acc = line.split()[1].strip()  # mems_db[line.split()[1].strip)()] = [] 60#             else:61#                 mems_db[read_acc] = read_mems_tmp 62#                 read_acc = line.split()[1].strip() 63            64#             read_mems_tmp = defaultdict(list)65#         else:66#             vals =  line.split() #11404_11606           1     11405       20267#             exon_part_id = vals[0]68#             chr_id, ref_coord_start, ref_coord_end = exon_part_id.split('^')69#             mem_len = int(vals[3])70#             mem_ref_exon_part_start = int(vals[1])71#             mem_read_start = int(vals[2])72#             # convert to 0-indexed as python, however last coordinate is inclusive of the hit, not as in python end-indexing73#             mem_tuple = mem(int(ref_coord_start) + mem_ref_exon_part_start - 1, int(ref_coord_start) + mem_ref_exon_part_start -1 + mem_len - 1,74#                             mem_read_start-1, mem_read_start-1 + mem_len - 1, 75#                             mem_len, None, exon_part_id)76            77#             read_mems_tmp[chr_id].append( mem_tuple )78#         # print(line)79#     # add last read80#     mems_db[read_acc] = read_mems_tmp 81#     return mems_db82def get_mem_records(mems_path, reads):83    '''84        Reads contains all the relevant reads in the batch to read mems from 85    '''86    relevant = False87    relevant_read_cnt = 088    for i, line in enumerate(open(mems_path, 'r')):89        if line[0] == '>':90            acc = line[1:].strip()91            if acc not in reads:92                relevant = False93                continue94            else:95                relevant = True96                relevant_read_cnt +=197            if relevant_read_cnt == 1:98                read_acc = acc  99            else:100                for chr_id in list(read_mems_tmp.keys()):101                    coordinate_sorted_tuples = sorted(read_mems_tmp[chr_id], key = lambda x: x[1])102                    sorted_mems = [ mem(x,y,c,d,val,j,e_id) for j, (x, y, c, d, val, e_id) in enumerate(coordinate_sorted_tuples) ]103                    read_mems_tmp[chr_id] = sorted_mems104                yield read_acc, read_mems_tmp105                read_acc = acc 106            107            read_mems_tmp = defaultdict(list)108        elif relevant:109                vals =  line.split() #11404_11606           1     11405       202110                exon_part_id = vals[0]111                chr_id, ref_coord_start, ref_coord_end = exon_part_id.split('^')112                chr_id = int(chr_id)113                mem_len = int(vals[3])114                # convert to 0-indexed reference as in python115                # however, for MEM length last coordinate is inclusive of the hit in MEM solvers, not as in python end-indexing116                mem_ref_exon_part_start = int(vals[1]) - 1117                mem_read_start = int(vals[2]) - 1118                ref_coord_start = int(ref_coord_start) # has already been 0-indexed when constructing parts119                mem_genome_start = ref_coord_start + mem_ref_exon_part_start120                121                122                # mem_tuple = mem(int(ref_coord_start) - 1 + mem_ref_exon_part_start - 1, int(ref_coord_start) - 1 + mem_ref_exon_part_start -1 + mem_len - 1,123                #                 mem_read_start-1, mem_read_start-1 + mem_len - 1, 124                #                 mem_len, None, exon_part_id)125                # read_mems_tmp[chr_id].append( mem_tuple )126                info_tuple = ( mem_genome_start, mem_genome_start + mem_len - 1,127                                mem_read_start, mem_read_start + mem_len - 1, 128                                mem_len, exon_part_id)129                read_mems_tmp[chr_id].append( info_tuple )130    for chr_id in list(read_mems_tmp.keys()):131        coordinate_sorted_tuples = sorted(read_mems_tmp[chr_id], key = lambda x: x[1])132        sorted_mems = [ mem(x,y,c,d,val,j,e_id) for j, (x, y, c, d, val, e_id) in enumerate(coordinate_sorted_tuples) ]133        read_mems_tmp[chr_id] = sorted_mems134    print("READ {0} RECORDS.".format(relevant_read_cnt))135    yield read_acc, read_mems_tmp136# def find_file_positions(read_pos_list, mummer_out_path, mummer_out_path_rc):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
