Best Python code snippet using autotest_python
cpuset.py
Source:cpuset.py  
...201    # container, which could be allocated exclusively to new child202    # containers.  This excludes any nodes now allocated203    # to existing children.204    return available_exclusive_mem_nodes(my_container_name())205def node_avail_kbytes(node):206    return node_mbytes << 10  # crude; fixed numa node size207def nodes_avail_mbytes(nodes):208    # nodes' combined user+avail size, in Mbytes209    return sum(node_avail_kbytes(n) for n in nodes) // 1024210def container_bytes(name):211    if fake_numa_containers:212        return nodes_avail_mbytes(get_mem_nodes(name)) << 20213    else:214        while True:215            file = memory_path(name) + '.limit_in_bytes'216            limit = int(utils.read_one_line(file))217            if limit < NO_LIMIT:218                return limit219            if name == SUPER_ROOT:220                return root_container_bytes221            name = os.path.dirname(name)222def container_mbytes(name):223    return container_bytes(name) >> 20224def mbytes_per_mem_node():225    # Get mbyte size of standard numa mem node, as float226    #  (some nodes are bigger than this)227    # Replaces utils.node_size().228    numa = get_boot_numa()229    if numa.endswith('M'):230        return float(numa[:-1])  # mbyte size of fake nodes231    elif numa:232        nodecnt = int(numa)  # fake numa mem nodes for container isolation233    else:234        nodecnt = len(utils.numa_nodes())  # phys mem-controller nodes235    # Use guessed total physical mem size, not kernel's236    #   lesser 'available memory' after various system tables.237    return utils.rounded_memtotal() / (nodecnt * 1024.0)238def get_cpus(container_name):239    file_name = cpus_path(container_name)240    if os.path.exists(file_name):241        return rangelist_to_set(utils.read_one_line(file_name))242    else:243        return set()244def get_tasks(container_name):245    file_name = tasks_path(container_name)246    try:247        tasks = [x.rstrip() for x in open(file_name).readlines()]248    except IOError:249        if os.path.exists(file_name):250            raise251        tasks = []   # container doesn't exist anymore252    return tasks253def inner_containers_of(parent):254    pattern = os.path.join(full_path(parent), '*/tasks')255    return [unpath(os.path.dirname(task_file))256            for task_file in glob.glob(pattern)]257def _release_container_nest(nest):258    # Destroy a container, and any nested sub-containers259    nest_path = full_path(nest)260    if os.path.exists(nest_path):261        # bottom-up walk of tree, releasing all nested sub-containers262        for child in inner_containers_of(nest):263            _release_container_nest(child)264        logging.debug("releasing container %s", nest)265        # Transfer any survivor tasks (e.g. self) to parent container266        parent = os.path.dirname(nest)267        move_tasks_into_container(parent, get_tasks(nest))268        # remove the now-empty outermost container of this nest269        if os.path.exists(nest_path):270            os.rmdir(nest_path)  # nested, or dead manager271def release_container(container_name=None):272    # Destroy a container273    my_container = my_container_name()274    if container_name is None:275        container_name = my_container276    _release_container_nest(container_name)277    displaced = my_container_name()278    if displaced != my_container:279        logging.debug('now running self (pid %d) in container "%s"',280                      os.getpid(), displaced)281def remove_empty_prio_classes(prios):282    # remove prio classes whose set of allowed priorities is empty283    #    e.g  'no:3;rt:;be:3;id:'  -->  'no:3;be:3'284    return ';'.join(p for p in prios.split(';') if p.split(':')[1])285def all_drive_names():286    # list of all disk drives sda,sdb,...287    paths = glob.glob('/sys/block/sd*')288    if not paths:289        paths = glob.glob('/sys/block/hd*')290    return [os.path.basename(path) for path in paths]291def set_io_controls(container_name, disks=[], ioprio_classes=[PROPIO_NORMAL],292                    io_shares=[95], io_limits=[0]):293    # set the propio controls for one container, for selected disks294    # writing directly to /dev/cgroup/container_name/io.io_service_level295    #    without using containerd or container.py296    # See wiki ProportionalIOScheduler for definitions297    # ioprio_classes: list of service classes, one per disk298    #    using numeric propio service classes as used by kernel API, namely299    #       1: RT, Real Time, aka PROPIO_PRIO300    #       2: BE, Best Effort, aka PROPIO_NORMAL301    #       3: PROPIO_IDLE302    # io_shares: list of disk-time-fractions, one per disk,303    #       as percentage integer 0..100304    # io_limits: list of limit on/off, one per disk305    #       0: no limit, shares use of other containers' unused disk time306    #       1: limited, container's use of disk time is capped to given DTF307    # ioprio_classes defaults to best-effort308    # io_limit defaults to no limit, use slack time309    if not disks:  # defaults to all drives310        disks = all_drive_names()311        io_shares      = [io_shares     [0]] * len(disks)312        ioprio_classes = [ioprio_classes[0]] * len(disks)313        io_limits      = [io_limits     [0]] * len(disks)314    if not (len(disks) == len(ioprio_classes) and len(disks) == len(io_shares)315                                              and len(disks) == len(io_limits)):316        raise error.AutotestError('Unequal number of values for io controls')317    service_level = io_attr(container_name, 'io_service_level')318    if not os.path.exists(service_level):319        return  # kernel predates propio features320            # or io cgroup is mounted separately from cpusets321    disk_infos = []322    for disk,ioclass,limit,share in zip(disks, ioprio_classes,323                                        io_limits, io_shares):324        parts = (disk, str(ioclass), str(limit), str(share))325        disk_info = ' '.join(parts)326        utils.write_one_line(service_level, disk_info)327        disk_infos.append(disk_info)328    logging.debug('set_io_controls of %s to %s',329                  container_name, ', '.join(disk_infos))330def abbrev_list(vals):331    """Condense unsigned (0,4,5,6,7,10) to '0,4-7,10'."""332    ranges = []333    lower = 0334    upper = -2335    for val in sorted(vals)+[-1]:336        if val != upper+1:337            if lower == upper:338                ranges.append(str(lower))339            elif lower <= upper:340                ranges.append('%d-%d' % (lower, upper))341            lower = val342        upper = val343    return ','.join(ranges)344def create_container_with_specific_mems_cpus(name, mems, cpus):345    need_fake_numa()346    os.mkdir(full_path(name))347    utils.write_one_line(cpuset_attr(name, 'mem_hardwall'), '1')348    utils.write_one_line(mems_path(name), ','.join(map(str, mems)))349    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))350    logging.debug('container %s has %d cpus and %d nodes totalling %s bytes',351                  name, len(cpus), len(get_mem_nodes(name)),352                  utils.human_format(container_bytes(name)) )353def create_container_via_memcg(name, parent, bytes, cpus):354    # create container via direct memcg cgroup writes355    os.mkdir(full_path(name))356    nodes = utils.read_one_line(mems_path(parent))357    utils.write_one_line(mems_path(name), nodes)  # inherit parent's nodes358    utils.write_one_line(memory_path(name)+'.limit_in_bytes', str(bytes))359    utils.write_one_line(cpus_path(name), ','.join(map(str, cpus)))360    logging.debug('Created container %s directly via memcg,'361                  ' has %d cpus and %s bytes',362                  name, len(cpus), utils.human_format(container_bytes(name)))363def _create_fake_numa_container_directly(name, parent, mbytes, cpus):364    need_fake_numa()365    lockfile = my_lock('inner')   # serialize race between parallel tests366    try:367        # Pick specific mem nodes for new cpuset's exclusive use368        # For now, arbitrarily pick highest available node numbers369        needed_kbytes = mbytes * 1024370        nodes = sorted(list(available_exclusive_mem_nodes(parent)))371        kbytes = 0372        nodecnt = 0373        while kbytes < needed_kbytes and nodecnt < len(nodes):374            nodecnt += 1375            kbytes += node_avail_kbytes(nodes[-nodecnt])376        if kbytes < needed_kbytes:377            parent_mbytes = container_mbytes(parent)378            if mbytes > parent_mbytes:379                raise error.AutotestError(380                      "New container's %d Mbytes exceeds "381                      "parent container's %d Mbyte size"382                      % (mbytes, parent_mbytes) )383            else:384                raise error.AutotestError(385                      "Existing sibling containers hold "386                      "%d Mbytes needed by new container"387                      % ((needed_kbytes - kbytes)//1024) )388        mems = nodes[-nodecnt:]389        create_container_with_specific_mems_cpus(name, mems, cpus)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
