How to use _release_container_nest method in autotest

Best Python code snippet using autotest_python

cpuset.py

Source:cpuset.py Github

copy

Full Screen

...130 return _avail_bytes_via_pages(parent) >> 20131def delete_leftover_test_containers():132 # recover mems and cores tied up by containers of prior failed tests:133 for child in inner_containers_of(SUPER_ROOT):134 _release_container_nest(child)135def my_lock(lockname):136 # lockname is 'inner'137 lockdir = os.environ['AUTODIR']138 lockname = os.path.join(lockdir, '.cpuset.lock.'+lockname)139 lockfile = open(lockname, 'w')140 fcntl.flock(lockfile, fcntl.LOCK_EX)141 return lockfile142def my_unlock(lockfile):143 fcntl.flock(lockfile, fcntl.LOCK_UN)144 lockfile.close()145# Convert '1-3,7,9-12' to set(1,2,3,7,9,10,11,12)146def rangelist_to_set(rangelist):147 result = set()148 if not rangelist:149 return result150 for x in rangelist.split(','):151 if re.match(r'^(\d+)$', x):152 result.add(int(x))153 continue154 m = re.match(r'^(\d+)-(\d+)$', x)155 if m:156 start = int(m.group(1))157 end = int(m.group(2))158 result.update(set(range(start, end+1)))159 continue160 msg = 'Cannot understand data input: %s %s' % (x, rangelist)161 raise ValueError(msg)162 return result163def my_container_name():164 # Get current process's inherited or self-built container name165 # within /dev/cpuset or /dev/cgroup. Is '' for root container.166 name = utils.read_one_line('/proc/%i/cpuset' % os.getpid())167 return name[1:] # strip leading /168def get_mem_nodes(container_name):169 # all mem nodes now available to a container, both exclusive & shared170 file_name = mems_path(container_name)171 if os.path.exists(file_name):172 return rangelist_to_set(utils.read_one_line(file_name))173 else:174 return set()175def _busy_mem_nodes(parent_container):176 # Get set of numa memory nodes now used (exclusively or shared)177 # by existing children of parent container178 busy = set()179 mem_files_pattern = os.path.join(full_path(parent_container),180 '*', cpuset_prefix+'mems')181 for mem_file in glob.glob(mem_files_pattern):182 child_container = os.path.dirname(mem_file)183 busy |= get_mem_nodes(child_container)184 return busy185def available_exclusive_mem_nodes(parent_container):186 # Get subset of numa memory nodes of parent container which could187 # be allocated exclusively to new child containers.188 # This excludes nodes now allocated to existing children.189 need_fake_numa()190 available = get_mem_nodes(parent_container)191 available -= _busy_mem_nodes(parent_container)192 return available193def my_mem_nodes():194 # Get set of numa memory nodes owned by current process's container.195 discover_container_style()196 if not mem_isolation_on:197 return set() # as expected by vmstress198 return get_mem_nodes(my_container_name())199def my_available_exclusive_mem_nodes():200 # Get subset of numa memory nodes owned by current process's201 # container, which could be allocated exclusively to new child202 # containers. This excludes any nodes now allocated203 # to existing children.204 return available_exclusive_mem_nodes(my_container_name())205def node_avail_kbytes(node):206 return node_mbytes << 10 # crude; fixed numa node size207def nodes_avail_mbytes(nodes):208 # nodes' combined user+avail size, in Mbytes209 return sum(node_avail_kbytes(n) for n in nodes) // 1024210def container_bytes(name):211 if fake_numa_containers:212 return nodes_avail_mbytes(get_mem_nodes(name)) << 20213 else:214 while True:215 file = memory_path(name) + '.limit_in_bytes'216 limit = int(utils.read_one_line(file))217 if limit < NO_LIMIT:218 return limit219 if name == SUPER_ROOT:220 return root_container_bytes221 name = os.path.dirname(name)222def container_mbytes(name):223 return container_bytes(name) >> 20224def mbytes_per_mem_node():225 # Get mbyte size of standard numa mem node, as float226 # (some nodes are bigger than this)227 # Replaces utils.node_size().228 numa = get_boot_numa()229 if numa.endswith('M'):230 return float(numa[:-1]) # mbyte size of fake nodes231 elif numa:232 nodecnt = int(numa) # fake numa mem nodes for container isolation233 else:234 nodecnt = len(utils.numa_nodes()) # phys mem-controller nodes235 # Use guessed total physical mem size, not kernel's236 # lesser 'available memory' after various system tables.237 return utils.rounded_memtotal() / (nodecnt * 1024.0)238def get_cpus(container_name):239 file_name = cpus_path(container_name)240 if os.path.exists(file_name):241 return rangelist_to_set(utils.read_one_line(file_name))242 else:243 return set()244def get_tasks(container_name):245 file_name = tasks_path(container_name)246 try:247 tasks = [x.rstrip() for x in open(file_name).readlines()]248 except IOError:249 if os.path.exists(file_name):250 raise251 tasks = [] # container doesn't exist anymore252 return tasks253def inner_containers_of(parent):254 pattern = os.path.join(full_path(parent), '*/tasks')255 return [unpath(os.path.dirname(task_file))256 for task_file in glob.glob(pattern)]257def _release_container_nest(nest):258 # Destroy a container, and any nested sub-containers259 nest_path = full_path(nest)260 if os.path.exists(nest_path):261 # bottom-up walk of tree, releasing all nested sub-containers262 for child in inner_containers_of(nest):263 _release_container_nest(child)264 logging.debug("releasing container %s", nest)265 # Transfer any survivor tasks (e.g. self) to parent container266 parent = os.path.dirname(nest)267 move_tasks_into_container(parent, get_tasks(nest))268 # remove the now-empty outermost container of this nest269 if os.path.exists(nest_path):270 os.rmdir(nest_path) # nested, or dead manager271def release_container(container_name=None):272 # Destroy a container273 my_container = my_container_name()274 if container_name is None:275 container_name = my_container276 _release_container_nest(container_name)277 displaced = my_container_name()278 if displaced != my_container:279 logging.debug('now running self (pid %d) in container "%s"',280 os.getpid(), displaced)281def remove_empty_prio_classes(prios):282 # remove prio classes whose set of allowed priorities is empty283 # e.g 'no:3;rt:;be:3;id:' --> 'no:3;be:3'284 return ';'.join(p for p in prios.split(';') if p.split(':')[1])285def all_drive_names():286 # list of all disk drives sda,sdb,...287 paths = glob.glob('/sys/block/sd*')288 if not paths:289 paths = glob.glob('/sys/block/hd*')290 return [os.path.basename(path) for path in paths]...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful