How to use cpus_path method in autotest

Best Python code snippet using autotest_python Github


Full Screen

1"""Cpuset class and cpuset graph, importing module will create model2"""3from __future__ import unicode_literals4from __future__ import print_function5from future.utils import lrange6import io7from builtins import str8from builtins import object9__copyright__ = """10Copyright (C) 2007-2010 Novell Inc.11Copyright (C) 2013-2017 SUSE12Author: Alex Tsariounov <>13This program is free software; you can redistribute it and/or modify14it under the terms of the GNU General Public License version 2 as15published by the Free Software Foundation.16This program is distributed in the hope that it will be useful,17but WITHOUT ANY WARRANTY; without even the implied warranty of18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 19General Public License for more details.20You should have received a copy of the GNU General Public License21along with this program; if not, write to the Free Software22Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA23"""24import os, re, sys, logging25if __name__ == '__main__': 26 sys.path.insert(0, "..")27 logging.basicConfig()28from cpuset.util import *29log = logging.getLogger('cset')30RootSet = None31class CpuSet(object):32 # sets is a class variable dict that keeps track of all 33 # cpusets discovered such that we can link them in properly.34 # The basepath is it's base path, the sets are indexed via35 # a relative path from this basepath.36 sets = {}37 basepath = ''38 cpus_path = '/cpus'39 mems_path = '/mems'40 cpu_exclusive_path = '/cpu_exclusive'41 mem_exclusive_path = '/mem_exclusive'42 tasks_path = '/tasks'43 def __init__(self, path=None):44 log.debug("initializing CpuSet")45 if (path == None):46 # recursively find all cpusets and link together47 # note: a breadth-first search could do this in one48 # pass, but there are never many cpusets, so49 # that optimization is left for the future50 log.debug("finding all cpusets")51 path = self.locate_cpusets()52 CpuSet.basepath = path53 log.debug("creating root node at %s", path)54 self.__root = True55 = 'root'56 self.path = '/'57 self.parent = self58 if (CpuSet.sets): 59 del CpuSet.sets60 CpuSet.sets = {}61 CpuSet.sets[self.path] = self62 # if mounted as a cgroup controller, switch file name format63 if not os.access(path + CpuSet.cpus_path, os.F_OK):64 CpuSet.cpus_path = '/cpuset.cpus'65 CpuSet.mems_path = '/cpuset.mems'66 CpuSet.cpu_exclusive_path = '/cpuset.cpu_exclusive'67 CpuSet.mem_exclusive_path = '/cpuset.mem_exclusive'68 # bottom-up search otherwise links will not exist69 log.debug("starting bottom-up discovery walk...")70 for dir, dirs, files in os.walk(path, topdown=False):71 log.debug("*** walking %s", dir)72 node = self if dir == CpuSet.basepath else CpuSet(dir)73 node.subsets = []74 for sub in dirs:75 relpath = (os.path.join(dir,sub).replace(CpuSet.basepath, '')76 if len(sub) > 0 else '/')77 node.subsets.append(CpuSet.sets[relpath])78 log.debug("%s has %i subsets: [%s]", dir, 79 len(node.subsets), '|'.join(dirs))80 log.debug("staring top-down parenting walk...")81 for dir, dirs, files in os.walk(path):82 dir = dir.replace(CpuSet.basepath, '')83 if len(dir) == 0: dir = '/'84 node = CpuSet.sets[dir]85 log.debug("~~~ walking %s", node.path)86 if dir == '/':87 log.debug("parent is self (root cpuset), skipping")88 else:89 parpath = dir[0:dir.rfind('/')]90 log.debug('parpath decodes to: %s from dir of: %s', parpath, dir)91 if parpath in CpuSet.sets:92 log.debug("parent is %s", parpath)93 node.parent = CpuSet.sets[parpath]94 else:95 log.debug("parent is root cpuset")96 node.parent = CpuSet.sets['/']97 log.debug("found %i cpusets", len(CpuSet.sets))98 else:99 # one new cpuset node100 log.debug("new cpuset node absolute: %s", path)101 path = (path.replace(CpuSet.basepath, '')102 if len(path) > len(CpuSet.basepath) else '/')103 log.debug(" relative: %s", path)104 if path in CpuSet.sets:105 log.debug("the cpuset %s already exists, skipping", path)106 self = CpuSet.sets[path] # questionable....107 return108 cpus = CpuSet.basepath + path + CpuSet.cpus_path109 if not os.access(cpus, os.F_OK):110 # not a cpuset directory111 str = '%s is not a cpuset directory' % (CpuSet.basepath + path)112 log.error(str)113 raise CpusetException(str)114 self.__root = False115 self.read_cpuset(path)116 CpuSet.sets[path] = self117 def locate_cpusets(self):118 log.debug("locating cpuset filesystem...")119 cpuset_mount_regex = re.compile(r"^[^ ]+ (/.+) (?:cpuset |cgroup (?:[^ ]*,)?cpuset[, ])")120 path = None121 f ="/proc/mounts",encoding="iso8859-1")122 for line in f:123 res = if res:125 path = break127 f.close()128 if not path:129 # mounted cpusets not found, so mount them130 if not os.access(config.mountpoint, os.F_OK):131 os.mkdir(config.mountpoint)132 ret = os.system("mount -t cpuset none " + config.mountpoint)133 if ret:134 raise CpusetException(135 'mount of cpuset filesystem failed, do you have permission?')136 path = config.mountpoint137 log.debug("cpusets mounted at: " + path)138 return path139 def read_cpuset(self, path):140 log.debug("reading cpuset passed relpath: %s", path)141 self.path = path142 log.debug("...path=%s", path)143 = path[path.rfind('/')+1:]144 log.debug("", def read_first_line_from(self, file_to_read):146 f =, encoding="iso8859-1")147 retval = f.readline().strip()148 f.close()149 return retval150 def write_value_to(self, file_to_write, value):151 log.debug("-> prop_set %s.%s = %s", self.path, file_to_write, value)152 f =, 'w', encoding="iso8859-1")153 f.write(str(value))154 f.close()155 def write_01_to(self, file_to_write, value):156 self.write_value_to(file_to_write, '1' if value else '0')157 # Properties of cpuset node158 def delprop(self):159 raise AttributeError("deletion of properties not allowed")160 def getcpus(self): 161 return self.read_first_line_from(CpuSet.cpus_path)162 def setcpus(self, newval):163 cpuspec_check(newval)164 self.write_value_to(CpuSet.cpus_path, newval)165 cpus = property(fget=getcpus, fset=setcpus, fdel=delprop, doc="CPU specifier")166 def getmems(self): 167 return self.read_first_line_from(CpuSet.mems_path)168 def setmems(self, newval): 169 # FIXME: check format for correctness170 self.write_value_to(CpuSet.mems_path, newval)171 mems = property(getmems, setmems, delprop, "Mem node specifier")172 173 def getcpuxlsv(self): 174 return self.read_first_line_from(CpuSet.cpu_exclusive_path) == '1'175 def setcpuxlsv(self, newval):176 self.write_01_to(CpuSet.cpu_exclusive_path, newval)177 cpu_exclusive = property(getcpuxlsv, setcpuxlsv, delprop, 178 "CPU exclusive flag")179 def getmemxlsv(self): 180 return self.read_first_line_from(CpuSet.mem_exclusive_path) == '1'181 def setmemxlsv(self, newval):182 self.write_01_to(CpuSet.mem_exclusive_path, newval)183 mem_exclusive = property(getmemxlsv, setmemxlsv, delprop, 184 "Memory exclusive flag")185 def gettasks(self):186 f =,encoding="iso8859-1")187 lst = map(lambda line: line.strip(), f.readlines())188 f.close()189 return list(lst)190 def settasks(self, tasklist):191 notfound = []192 unmovable = []193 if len(tasklist) > 3:194 pb = ProgressBar(len(tasklist), '=')195 tick = 0196 prog = True197 else:198 prog = False199 for task in tasklist:200 try:201 f =,'w',encoding="iso8859-1")202 f.write(task)203 f.close()204 except Exception as err:205 if str(err).find('No such process') != -1:206 notfound.append(task)207 elif str(err).find('Invalid argument'):208 unmovable.append(task)209 else: 210 raise211 if prog:212 tick += 1213 pb.progress(tick)214 if len(notfound) > 0:215'**> %s tasks were not found, so were not moved', len(notfound))216 log.debug(' not found: %s', notfound)217 if len(unmovable) > 0:218'**> %s tasks are not movable, impossible to move', len(unmovable))219 log.debug(' not movable: %s', unmovable)220 log.debug("-> prop_set %s.tasks set with %s tasks", self.path, 221 len(tasklist)) 222 tasks = property(gettasks, settasks, delprop, "Task list")223#224# Helper functions225#226def lookup_task_from_proc(pid):227 """lookup the cpuset of the specified pid from proc filesystem"""228 log.debug("entering lookup_task_from_proc, pid = %s", str(pid))229 path = "/proc/"+str(pid)+"/cpuset"230 if os.access(path, os.F_OK):231 f =,encoding="iso8859-1")232 set = f.readline().strip()233 f.close()234 log.debug('lookup_task_from_proc: found task %s cpuset: %s', str(pid), set)235 return set236 # FIXME: add search for threads here...237 raise CpusetException("task ID %s not found, i.e. not running" % str(pid))238def lookup_task_from_cpusets(pid):239 """lookup the cpuset of the specified pid from cpuset filesystem"""240 log.debug("entering lookup_task_from_cpusets, pid = %s", str(pid))241 global RootSet242 if RootSet == None: rescan()243 gotit = None244 if pid in RootSet.tasks:245 gotit = RootSet246 else:247 for node in walk_set(RootSet):248 if pid in node.tasks:249 gotit = node250 break251 if gotit:252 log.debug('lookup_task_from_cpusets: found task %s cpuset: %s', str(pid),253 gotit.path)254 return gotit.path255 raise CpusetException("task ID %s not found, i.e. not running" % str(pid))256def unique_set(name):257 """find a unique cpuset by name or path, raise if multiple sets found"""258 log.debug("entering unique_set, name=%s", name)259 if name == None:260 raise CpusetException('unique_set() passed None as arg')261 if isinstance(name, CpuSet): return name262 nl = find_sets(name)263 if len(nl) > 1: 264 raise CpusetNotUnique('cpuset name "%s" not unique: %s' % (name,265 [x.path for x in nl]) )266 return nl[0]267def find_sets(name):268 """find cpusets by name or path, raise CpusetNotFound if not found"""269 log = logging.getLogger("cset.find_sets")270 log.debug('finding "%s" in cpusets', name)271 nodelist = []272 if name.find('/') == -1:273 log.debug("find by name")274 if name == 'root':275 log.debug("returning root set")276 nodelist.append(RootSet)277 else:278 log.debug("walking from: %s", RootSet.path)279 for node in walk_set(RootSet):280 if == name:281 log.debug('... found node "%s"', name)282 nodelist.append(node)283 else:284 log.debug("find by path")285 # make sure that leading slash is used if searching by path286 if name[0] != '/': name = '/' + name287 if name in CpuSet.sets:288 log.debug('... found node "%s"', CpuSet.sets[name].name)289 nodelist.append(CpuSet.sets[name])290 if len(nodelist) == 0:291 raise CpusetNotFound('cpuset "%s" not found in cpusets' % name)292 return nodelist293def walk_set(set):294 """ generator for walking cpuset graph, breadth-first, more or less... """295 log = logging.getLogger("cset.walk_set")296 for node in set.subsets:297 log.debug("+++ yield %s", yield node299 for node in set.subsets:300 for result in walk_set(node): 301 log.debug("++++++ yield %s", 302 yield result 303def rescan():304 """re-read the cpuset directory to sync system with data structs"""305 log.debug("entering rescan")306 global RootSet, maxcpu, allcpumask307 RootSet = CpuSet()308 # figure out system properties309 log.debug("rescan: all cpus = %s", RootSet.cpus)310 maxcpu = int(RootSet.cpus.split('-')[-1].split(',')[-1])311 log.debug(" max cpu = %s", maxcpu)312 allcpumask = calc_cpumask(maxcpu)313 log.debug(" allcpumask = %s", allcpumask)314def cpuspec_check(cpuspec, usemax=True):315 """check format of cpuspec for validity"""316 log.debug("cpuspec_check(%s)", cpuspec)317 mo ="[^0-9,\-]", cpuspec)318 if mo:319 str = 'CPUSPEC "%s" contains invalid charaters: %s' % (cpuspec, log.debug(str)321 raise CpusetException(str)322 groups = cpuspec.split(',')323 if usemax and int(groups[-1].split('-')[-1]) > int(maxcpu):324 str = 'CPUSPEC "%s" specifies higher max(%s) than available(%s)' % \325 (cpuspec, groups[-1].split('-')[-1], maxcpu)326 log.debug(str)327 raise CpusetException(str)328 for sub in groups:329 it = sub.split('-')330 if len(it) == 2:331 if len(it[0]) == 0 or len(it[1]) == 0:332 # catches negative numbers333 raise CpusetException('CPUSPEC "%s" has bad group "%s"' % (cpuspec, sub))334 if len(it) > 2:335 raise CpusetException('CPUSPEC "%s" has bad group "%s"' % (cpuspec, sub))336def cpuspec_to_hex(cpuspec):337 """convert a cpuspec to the hexadecimal string representation"""338 log.debug('cpuspec_to_string(%s)', cpuspec)339 cpuspec_check(cpuspec, usemax=False)340 groups = cpuspec.split(',')341 number = 0342 for sub in groups:343 items = sub.split('-')344 if len(items) == 1:345 if not len(items[0]):346 # two consecutive commas in cpuspec347 continue348 # one cpu in this group349 log.debug(" adding cpu %s to result", items[0])350 number |= 1 << int(items[0])351 elif len(items) == 2: 352 il = [int(ii) for ii in items]353 if il[1] >= il[0]: rng = lrange(il[0], il[1]+1)354 else: rng = lrange(il[1], il[0]+1)355 log.debug(' group=%s has cpu range of %s', sub, rng)356 for num in rng: number |= 1 << num357 else:358 raise CpusetException('CPUSPEC "%s" has bad group "%s"' % (cpuspec, sub))359 log.debug(' final int number=%s in hex=%x', number, number)360 return '%x' % number361def memspec_check(memspec):362 """check format of memspec for validity"""363 # FIXME: look under /sys/devices/system/node for numa memory node364 # information and check the memspec that way, currently we only do365 # a basic check366 log.debug("memspec_check(%s)", memspec)367 mo ="[^0-9,\-]", memspec)368 if mo:369 str = 'MEMSPEC "%s" contains invalid charaters: %s' % (memspec, log.debug(str)371 raise CpusetException(str)372def cpuspec_inverse(cpuspec):373 """calculate inverse of cpu specification"""374 cpus = [0 for x in lrange(maxcpu+1)]375 groups = cpuspec.split(',')376 log.debug("cpuspec_inverse(%s) maxcpu=%d groups=%d", 377 cpuspec, maxcpu, len(groups))378 for set in groups:379 items = set.split('-')380 if len(items) == 1:381 if not len(items[0]):382 # common error of two consecutive commas in cpuspec,383 # just ignore it and keep going384 continue385 cpus[int(items[0])] = 1386 elif len(items) == 2:387 for x in lrange(int(items[0]), int(items[1])+1):388 cpus[x] = 1389 else:390 raise CpusetException("cpuspec(%s) has bad group %s" % (cpuspec, set))391 log.debug("cpuspec array: %s", cpus)392 # calculate inverse of array393 for x in lrange(0, len(cpus)):394 cpus[x] = int(cpus[x] == 0)395 log.debug(" inverse: %s", cpus)396 # build cpuspec expression397 nspec = ""398 ingrp = False399 for x in lrange(0, len(cpus)):400 if cpus[x] == 0 and ingrp:401 nspec += str(begin)402 if x > begin+1: 403 nspec += '-' + str(x if cpus[x] else x-1)404 ingrp = False405 if cpus[x] == 1:406 if not ingrp: 407 if len(nspec): nspec += ','408 begin = x409 ingrp = True410 if x == len(cpus)-1:411 nspec += str(begin)412 if x > begin:413 nspec += '-' + str(x)414 log.debug("inverse cpuspec: %s", nspec)415 return nspec416def summary(set):417 """return summary of cpuset with number of tasks running"""418 log.debug("entering summary, set=%s", set.path)419 if len(set.tasks) == 1: msg = 'task'420 else: msg = 'tasks'421 return ('"%s" cpuset of CPUSPEC(%s) with %s %s running' %422 (, set.cpus, len(set.tasks), msg) )423 424def calc_cpumask(max):425 all = 1426 ii = 1427 while ii < max+1:428 all |= 1 << ii429 ii += 1430 return "%x" % all431# Test if stand-alone execution432if __name__ == '__main__':433 rescan()434 # first create them, then find them435 try:436 os.makedirs(CpuSet.basepath+'/csettest/one/x')437 os.mkdir(CpuSet.basepath+'/csettest/one/y')438 os.makedirs(CpuSet.basepath+'/csettest/two/x')439 os.mkdir(CpuSet.basepath+'/csettest/two/y')440 except:441 pass442 print('Max cpu on system:', maxcpu)443 print('All cpu mask: 0x%s' % allcpumask)444 print('------- find_sets tests --------')445 print('Find by root of "root" -> ', find_sets("root"))446 print('Find by path of "/" -> ', find_sets("/"))447 print('Find by path of "/csettest/one" -> ', find_sets("/csettest/one"))448 print('Find by name of "one" -> ', find_sets("one"))449 print('Find by path of "/csettest/two" -> ', find_sets("/csettest/two"))450 print('Find by name of "two" -> ', find_sets("two"))451 print('Find by path of "/csettest/one/x" -> ', find_sets("/csettest/one/x"))452 print('Find by name of "x" -> ', find_sets("x"))453 print('Find by path of "/csettest/two/y" -> ', find_sets("/csettest/two/y"))454 print('Find by name of "y" -> ', find_sets("y"))455 try:456 node = find_sets("cantfindmenoway")457 print('Found "cantfindmenoway??!? -> ', node)458 except CpusetException as err:...

Full Screen

Full Screen Github


Full Screen

1#!/usr/bin/env python32# SPDX-License-Identifier: LGPL-2.1-only3#4# cgconfigparser functionality test - '-a', '-d', '-f' flags5#6# Copyright (c) 2021 Oracle and/or its affiliates.7# Author: Tom Hromatka <>8#9from container import ContainerError10from run import Run, RunError11from cgroup import Cgroup12import consts13import ftests14import utils15import sys16import os17CONTROLLER = 'cpuset'18CGNAME = '019cgconfig'19CONFIG_FILE = '''group20{} {{21 {} {{22 }}23}}'''.format(CGNAME, CONTROLLER)24USER = 'cguser019'25GROUP = 'cggroup019'26DPERM = '515'27FPERM = '246'28CONFIG_FILE_NAME = os.path.join(os.getcwd(), '019cgconfig.conf')29def prereqs(config):30 result = consts.TEST_PASSED31 cause = None32 return result, cause33def setup(config):34 f = open(CONFIG_FILE_NAME, 'w')35 f.write(CONFIG_FILE)36 f.close()37 if config.args.container:38['useradd', '-p', 'Test019#1', USER])39['groupadd', GROUP])40 else:41['sudo', 'useradd', '-p', 'Test019#1', USER])42['sudo', 'groupadd', '-f', GROUP])43def test(config):44 result = consts.TEST_PASSED45 cause = None46 Cgroup.configparser(config, load_file=CONFIG_FILE_NAME, dflt_usr=USER,47 dflt_grp=GROUP, dperm=DPERM, fperm=FPERM)48 mnt_path = Cgroup.get_controller_mount_point(CONTROLLER)49 cpus_path = os.path.join(mnt_path, CGNAME, 'cpuset.cpus')50 user = utils.get_file_owner_username(config, cpus_path)51 group = utils.get_file_owner_group_name(config, cpus_path)52 if user != USER:53 result = consts.TEST_FAILED54 cause = (55 'Owner name failed. Expected {}, received {}\n'56 ''.format(USER, user)57 )58 return result, cause59 if group != GROUP:60 result = consts.TEST_FAILED61 cause = (62 'Owner group failed. Expected {}, received {}\n'63 ''.format(GROUP, group)64 )65 return result, cause66 fperm = utils.get_file_permissions(config, cpus_path)67 if fperm != FPERM:68 result = consts.TEST_FAILED69 cause = (70 'File permissions failed. Expected {}, received {}\n'71 ''.format(FPERM, fperm)72 )73 return result, cause74 dperm = utils.get_file_permissions(config, os.path.join(mnt_path, CGNAME))75 if dperm != DPERM:76 result = consts.TEST_FAILED77 cause = (78 'Directory permissions failed. Expected {}, received {}\n'79 ''.format(DPERM, dperm)80 )81 return result, cause82 return result, cause83def teardown(config):84 os.remove(CONFIG_FILE_NAME)85 try:86 if config.args.container:87['userdel', USER])88['groupdel', GROUP])89 else:90['sudo', 'userdel', '-r', USER])91['sudo', 'groupdel', GROUP])92 except (ContainerError, RunError, ValueError):93 pass94 Cgroup.delete(config, CONTROLLER, CGNAME)95def main(config):96 [result, cause] = prereqs(config)97 if result != consts.TEST_PASSED:98 return [result, cause]99 try:100 setup(config)101 [result, cause] = test(config)102 finally:103 teardown(config)104 return [result, cause]105if __name__ == '__main__':106 config = ftests.parse_args()107 # this test was invoked directly. run only it108 config.args.num = int(os.path.basename(__file__).split('-')[0])109 sys.exit(ftests.main(config))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?