Best Python code snippet using autotest_python
cpuset.py
Source:cpuset.py  
...104    move_tasks_into_container(name, [me])105    logging.debug('running self (pid %s) in container "%s"', me, name)106def _avail_mbytes_via_nodes(parent):107    # total mbytes of mem nodes available for new containers in parent108    free_nodes = available_exclusive_mem_nodes(parent)109    mbytes = nodes_avail_mbytes(free_nodes)110    # don't have exact model for how container mgr measures mem space111    # better here to underestimate than overestimate112    mbytes = max(mbytes - node_mbytes//2, 0)113    return mbytes114def _avail_bytes_via_pages(parent):115    # Get memory bytes available to parent container which could116    #  be allocated exclusively to new child containers.117    # This excludes mem previously allocated to existing children.118    available = container_bytes(parent)119    mem_files_pattern = os.path.join(full_path(parent),120                                     '*', 'memory.limit_in_bytes')121    for mem_file in glob.glob(mem_files_pattern):122        child_container = unpath(os.path.dirname(mem_file))123        available -= container_bytes(child_container)124    return available125def avail_mbytes(parent=SUPER_ROOT):126    # total mbytes available in parent, for exclusive use in new containers127    if fake_numa_containers:128        return _avail_mbytes_via_nodes(parent)129    else:130        return _avail_bytes_via_pages(parent) >> 20131def delete_leftover_test_containers():132    # recover mems and cores tied up by containers of prior failed tests:133    for child in inner_containers_of(SUPER_ROOT):134        _release_container_nest(child)135def my_lock(lockname):136    # lockname is 'inner'137    lockdir = os.environ['AUTODIR']138    lockname = os.path.join(lockdir, '.cpuset.lock.'+lockname)139    lockfile = open(lockname, 'w')140    fcntl.flock(lockfile, fcntl.LOCK_EX)141    return lockfile142def my_unlock(lockfile):143    fcntl.flock(lockfile, fcntl.LOCK_UN)144    lockfile.close()145# Convert '1-3,7,9-12' to set(1,2,3,7,9,10,11,12)146def rangelist_to_set(rangelist):147    result = set()148    if not rangelist:149        return result150    for x in rangelist.split(','):151        if re.match(r'^(\d+)$', x):152            result.add(int(x))153            continue154        m = re.match(r'^(\d+)-(\d+)$', x)155        if m:156            start = int(m.group(1))157            end = int(m.group(2))158            result.update(set(range(start, end+1)))159            continue160        msg = 'Cannot understand data input: %s %s' % (x, rangelist)161        raise ValueError(msg)162    return result163def my_container_name():164    # Get current process's inherited or self-built container name165    #   within /dev/cpuset or /dev/cgroup.  Is '' for root container.166    name = utils.read_one_line('/proc/%i/cpuset' % os.getpid())167    return name[1:]   # strip leading /168def get_mem_nodes(container_name):169    # all mem nodes now available to a container, both exclusive & shared170    file_name = mems_path(container_name)171    if os.path.exists(file_name):172        return rangelist_to_set(utils.read_one_line(file_name))173    else:174        return set()175def _busy_mem_nodes(parent_container):176    # Get set of numa memory nodes now used (exclusively or shared)177    #   by existing children of parent container178    busy = set()179    mem_files_pattern = os.path.join(full_path(parent_container),180                                     '*', cpuset_prefix+'mems')181    for mem_file in glob.glob(mem_files_pattern):182        child_container = os.path.dirname(mem_file)183        busy |= get_mem_nodes(child_container)184    return busy185def available_exclusive_mem_nodes(parent_container):186    # Get subset of numa memory nodes of parent container which could187    #  be allocated exclusively to new child containers.188    # This excludes nodes now allocated to existing children.189    need_fake_numa()190    available = get_mem_nodes(parent_container)191    available -= _busy_mem_nodes(parent_container)192    return available193def my_mem_nodes():194    # Get set of numa memory nodes owned by current process's container.195    discover_container_style()196    if not mem_isolation_on:197        return set()    # as expected by vmstress198    return get_mem_nodes(my_container_name())199def my_available_exclusive_mem_nodes():200    # Get subset of numa memory nodes owned by current process's201    # container, which could be allocated exclusively to new child202    # containers.  This excludes any nodes now allocated203    # to existing children.204    return available_exclusive_mem_nodes(my_container_name())205def node_avail_kbytes(node):206    return node_mbytes << 10  # crude; fixed numa node size207def nodes_avail_mbytes(nodes):208    # nodes' combined user+avail size, in Mbytes209    return sum(node_avail_kbytes(n) for n in nodes) // 1024210def container_bytes(name):211    if fake_numa_containers:212        return nodes_avail_mbytes(get_mem_nodes(name)) << 20213    else:214        while True:215            file = memory_path(name) + '.limit_in_bytes'216            limit = int(utils.read_one_line(file))217            if limit < NO_LIMIT:218                return limit219            if name == SUPER_ROOT:220                return root_container_bytes221            name = os.path.dirname(name)222def container_mbytes(name):223    return container_bytes(name) >> 20224def mbytes_per_mem_node():225    # Get mbyte size of standard numa mem node, as float226    #  (some nodes are bigger than this)227    # Replaces utils.node_size().228    numa = get_boot_numa()229    if numa.endswith('M'):230        return float(numa[:-1])  # mbyte size of fake nodes231    elif numa:232        nodecnt = int(numa)  # fake numa mem nodes for container isolation233    else:234        nodecnt = len(utils.numa_nodes())  # phys mem-controller nodes235    # Use guessed total physical mem size, not kernel's236    #   lesser 'available memory' after various system tables.237    return utils.rounded_memtotal() / (nodecnt * 1024.0)238def get_cpus(container_name):239    file_name = cpus_path(container_name)240    if os.path.exists(file_name):241        return rangelist_to_set(utils.read_one_line(file_name))242    else:243        return set()244def get_tasks(container_name):245    file_name = tasks_path(container_name)246    try:247        tasks = [x.rstrip() for x in open(file_name).readlines()]248    except IOError:249        if os.path.exists(file_name):250            raise251        tasks = []   # container doesn't exist anymore252    return tasks253def inner_containers_of(parent):254    pattern = os.path.join(full_path(parent), '*/tasks')255    return [unpath(os.path.dirname(task_file))256            for task_file in glob.glob(pattern)]257def _release_container_nest(nest):258    # Destroy a container, and any nested sub-containers259    nest_path = full_path(nest)260    if os.path.exists(nest_path):261        # bottom-up walk of tree, releasing all nested sub-containers262        for child in inner_containers_of(nest):263            _release_container_nest(child)264        logging.debug("releasing container %s", nest)265        # Transfer any survivor tasks (e.g. self) to parent container266        parent = os.path.dirname(nest)267        move_tasks_into_container(parent, get_tasks(nest))268        # remove the now-empty outermost container of this nest269        if os.path.exists(nest_path):270            os.rmdir(nest_path)  # nested, or dead manager271def release_container(container_name=None):272    # Destroy a container273    my_container = my_container_name()274    if container_name is None:275        container_name = my_container276    _release_container_nest(container_name)277    displaced = my_container_name()278    if displaced != my_container:279        logging.debug('now running self (pid %d) in container "%s"',280                      os.getpid(), displaced)281def remove_empty_prio_classes(prios):282    # remove prio classes whose set of allowed priorities is empty283    #    e.g  'no:3;rt:;be:3;id:'  -->  'no:3;be:3'284    return ';'.join(p for p in prios.split(';') if p.split(':')[1])285def all_drive_names():286    # list of all disk drives sda,sdb,...287    paths = glob.glob('/sys/block/sd*')288    if not paths:289        paths = glob.glob('/sys/block/hd*')290    return [os.path.basename(path) for path in paths]291def set_io_controls(container_name, disks=[], ioprio_classes=[PROPIO_NORMAL],292                    io_shares=[95], io_limits=[0]):293    # set the propio controls for one container, for selected disks294    # writing directly to /dev/cgroup/container_name/io.io_service_level295    #    without using containerd or container.py296    # See wiki ProportionalIOScheduler for definitions297    # ioprio_classes: list of service classes, one per disk298    #    using numeric propio service classes as used by kernel API, namely299    #       1: RT, Real Time, aka PROPIO_PRIO300    #       2: BE, Best Effort, aka PROPIO_NORMAL301    #       3: PROPIO_IDLE302    # io_shares: list of disk-time-fractions, one per disk,303    #       as percentage integer 0..100304    # io_limits: list of limit on/off, one per disk305    #       0: no limit, shares use of other containers' unused disk time306    #       1: limited, container's use of disk time is capped to given DTF307    # ioprio_classes defaults to best-effort308    # io_limit defaults to no limit, use slack time309    if not disks:  # defaults to all drives310        disks = all_drive_names()311        io_shares      = [io_shares     [0]] * len(disks)312        ioprio_classes = [ioprio_classes[0]] * len(disks)313        io_limits      = [io_limits     [0]] * len(disks)314    if not (len(disks) == len(ioprio_classes) and len(disks) == len(io_shares)315                                              and len(disks) == len(io_limits)):316        raise error.AutotestError('Unequal number of values for io controls')317    service_level = io_attr(container_name, 'io_service_level')318    if not os.path.exists(service_level):319        return  # kernel predates propio features320            # or io cgroup is mounted separately from cpusets321    disk_infos = []322    for disk,ioclass,limit,share in zip(disks, ioprio_classes,323                                        io_limits, io_shares):324        parts = (disk, str(ioclass), str(limit), str(share))325        disk_info = ' '.join(parts)326        utils.write_one_line(service_level, disk_info)327        disk_infos.append(disk_info)328    logging.debug('set_io_controls of %s to %s',329                  container_name, ', '.join(disk_infos))330def abbrev_list(vals):331    """Condense unsigned (0,4,5,6,7,10) to '0,4-7,10'."""332    ranges = []333    lower = 0334    upper = -2335    for val in sorted(vals)+[-1]:336        if val != upper+1:337            if lower == upper:338                ranges.append(str(lower))339            elif lower <= upper:340                ranges.append('%d-%d' % (lower, upper))341            lower = val342        upper = val343    return ','.join(ranges)344def create_container_with_specific_mems_cpus(name, mems, cpus):345    need_fake_numa()346    os.mkdir(full_path(name))347    utils.write_one_line(cpuset_attr(name, 'mem_hardwall'), '1')348    utils.write_one_line(mems_path(name), ','.join(map(str, mems)))349    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))350    logging.debug('container %s has %d cpus and %d nodes totalling %s bytes',351                  name, len(cpus), len(get_mem_nodes(name)),352                  utils.human_format(container_bytes(name)) )353def create_container_via_memcg(name, parent, bytes, cpus):354    # create container via direct memcg cgroup writes355    os.mkdir(full_path(name))356    nodes = utils.read_one_line(mems_path(parent))357    utils.write_one_line(mems_path(name), nodes)  # inherit parent's nodes358    utils.write_one_line(memory_path(name)+'.limit_in_bytes', str(bytes))359    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))360    logging.debug('Created container %s directly via memcg,'361                  ' has %d cpus and %s bytes',362                  name, len(cpus), utils.human_format(container_bytes(name)))363def _create_fake_numa_container_directly(name, parent, mbytes, cpus):364    need_fake_numa()365    lockfile = my_lock('inner')   # serialize race between parallel tests366    try:367        # Pick specific mem nodes for new cpuset's exclusive use368        # For now, arbitrarily pick highest available node numbers369        needed_kbytes = mbytes * 1024370        nodes = sorted(list(available_exclusive_mem_nodes(parent)))371        kbytes = 0372        nodecnt = 0373        while kbytes < needed_kbytes and nodecnt < len(nodes):374            nodecnt += 1375            kbytes += node_avail_kbytes(nodes[-nodecnt])376        if kbytes < needed_kbytes:377            parent_mbytes = container_mbytes(parent)378            if mbytes > parent_mbytes:379                raise error.AutotestError(380                      "New container's %d Mbytes exceeds "381                      "parent container's %d Mbyte size"382                      % (mbytes, parent_mbytes) )383            else:384                raise error.AutotestError(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
