Best Python code snippet using autotest_python
utils_cgroup.py
Source:utils_cgroup.py  
...48        self.root = modules.get_pwd(self.module)49        if not self.root:50            raise error.TestError("cg.initialize(): Module %s not found"51                                  % self.module)52    def __get_cgroup_pwd(self, cgroup):53        """54        Get cgroup's full path55        :param: cgroup: cgroup name56        :return: cgroup's full path57        """58        if not isinstance(cgroup, str):59            raise error.TestError("cgroup type isn't string!")60        return os.path.join(self.root, cgroup) + '/'61    def get_cgroup_name(self, pwd=None):62        """63        Get cgroup's name64        :param: pwd: cgroup name65        :return: cgroup's name66        """67        if pwd is None:68            # root cgroup69            return None70        if isinstance(pwd, int):71            pwd = self.cgroups[pwd]72        # self.root is "/cgroup/blkio," not "/cgroup/blkio/"73        # cgroup is "/cgroup/blkio/test" or "/cgroup/blkio/test/test"74        # expected cgroup name is test/ or test/test/75        if pwd.startswith(self.root + '/'):76            return pwd[len(self.root) + 1:]77        return None78    def get_cgroup_index(self, cgroup):79        """80        Get cgroup's index in cgroups81        :param: cgroup: cgroup name82        :return: index of cgroup83        """84        try:85            if self.__get_cgroup_pwd(cgroup) not in self.cgroups:86                raise error.TestFail("%s not exists!" % cgroup)87            cgroup_pwd = self.__get_cgroup_pwd(cgroup)88            return self.cgroups.index(cgroup_pwd)89        except error.CmdError:90            raise error.TestFail("Find index failed!")91    def mk_cgroup_cgcreate(self, pwd=None, cgroup=None):92        """93        Make a cgroup by cgcreate command94        :params: cgroup: Maked cgroup name95        :return: last cgroup index96        """97        try:98            parent_cgroup = self.get_cgroup_name(pwd)99            if cgroup is None:100                range = "abcdefghijklmnopqrstuvwxyz0123456789"101                sub_cgroup = "cgroup-" + "".join(random.sample(range +102                                                               range.upper(), 6))103            else:104                sub_cgroup = cgroup105            if parent_cgroup is None:106                cgroup = sub_cgroup107            else:108                # Parent cgroup:test. Created cgroup:test1.109                # Whole cgroup name is "test/test1"110                cgroup = os.path.join(parent_cgroup, sub_cgroup)111            if self.__get_cgroup_pwd(cgroup) in self.cgroups:112                raise error.TestFail("%s exists!" % cgroup)113            cgcreate_cmd = "cgcreate -g %s:%s" % (self.module, cgroup)114            utils.run(cgcreate_cmd, ignore_status=False)115            pwd = self.__get_cgroup_pwd(cgroup)116            self.cgroups.append(pwd)117            return len(self.cgroups) - 1118        except error.CmdError:119            raise error.TestFail("Make cgroup by cgcreate failed!")120    def mk_cgroup(self, pwd=None, cgroup=None):121        """122        Creates new temporary cgroup123        :param pwd: where to create this cgroup (default: self.root)124        :param cgroup: desired cgroup name125        :return: last cgroup index126        """127        if pwd is None:128            pwd = self.root129        if isinstance(pwd, int):130            pwd = self.cgroups[pwd]131        try:132            if cgroup and self.__get_cgroup_pwd(cgroup) in self.cgroups:133                raise error.TestFail("%s exists!" % cgroup)134            if not cgroup:135                pwd = mkdtemp(prefix='cgroup-', dir=pwd) + '/'136            else:137                pwd = os.path.join(pwd, cgroup) + '/'138                if not os.path.exists(pwd):139                    os.mkdir(pwd)140        except Exception, inst:141            raise error.TestError("cg.mk_cgroup(): %s" % inst)142        self.cgroups.append(pwd)143        return len(self.cgroups) - 1144    def cgexec(self, cgroup, cmd, args=""):145        """146        Execute command in desired cgroup147        :param: cgroup: Desired cgroup148        :param: cmd: Executed command149        :param: args: Executed command's parameters150        """151        try:152            args_str = ""153            if len(args):154                args_str = " ".join(args)155            cgexec_cmd = ("cgexec -g %s:%s %s %s" %156                          (self.module, cgroup, cmd, args_str))157            status, output = commands.getstatusoutput(cgexec_cmd)158            return status, output159        except error.CmdError, detail:160            raise error.TestFail("Execute %s in cgroup failed!\n%s" %161                                 (cmd, detail))162    def rm_cgroup(self, pwd):163        """164        Removes cgroup.165        :param pwd: cgroup directory.166        """167        if isinstance(pwd, int):168            pwd = self.cgroups[pwd]169        try:170            os.rmdir(pwd)171            self.cgroups.remove(pwd)172        except ValueError:173            logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"174                         "using this Cgroup")175        except Exception, inst:176            raise error.TestError("cg.rm_cgroup(): %s" % inst)177    def cgdelete_all_cgroups(self):178        """179        Delete all cgroups in the module180        """181        try:182            for cgroup_pwd in self.cgroups:183                # Ignore sub cgroup184                cgroup = self.get_cgroup_name(cgroup_pwd)185                if cgroup.count("/") > 0:186                    continue187                self.cgdelete_cgroup(cgroup, True)188        except error.CmdError:189            raise error.TestFail("cgdelete all cgroups in %s failed!"190                                 % self.module)191    def cgdelete_cgroup(self, cgroup, recursive=False):192        """193        Delete desired cgroup.194        :params cgroup: desired cgroup195        :params force:If true, sub cgroup can be deleted with parent cgroup196        """197        try:198            cgroup_pwd = self.__get_cgroup_pwd(cgroup)199            if cgroup_pwd not in self.cgroups:200                raise error.TestError("%s doesn't exist!" % cgroup)201            cmd = "cgdelete %s:%s" % (self.module, cgroup)202            if recursive:203                cmd += " -r"204            utils.run(cmd, ignore_status=False)205            self.cgroups.remove(cgroup_pwd)206        except error.CmdError, detail:207            raise error.TestFail("cgdelete %s failed!\n%s" %208                                 (cgroup, detail))209    def cgclassify_cgroup(self, pid, cgroup):210        """211        Classify pid into cgroup212        :param pid: pid of the process213        :param cgroup: cgroup name214        """215        try:216            cgroup_pwd = self.__get_cgroup_pwd(cgroup)217            if cgroup_pwd not in self.cgroups:218                raise error.TestError("%s doesn't exist!" % cgroup)219            cgclassify_cmd = ("cgclassify -g %s:%s %d" %220                              (self.module, cgroup, pid))221            utils.run(cgclassify_cmd, ignore_status=False)222        except error.CmdError, detail:223            raise error.TestFail("Classify process to tasks file failed!:%s" %224                                 detail)225    def get_pids(self, pwd=None):226        """227        Get all pids in cgroup228        :params: pwd: cgroup directory229        :return: all pids(list)230        """231        if pwd is None:232            pwd = self.root233        if isinstance(pwd, int):234            pwd = self.cgroups[pwd]235        try:236            return [_.strip() for _ in open(os.path.join(pwd, 'tasks'), 'r')]237        except Exception, inst:238            raise error.TestError("cg.get_pids(): %s" % inst)239    def test(self, cmd):240        """241        Executes cgroup_client.py with cmd parameter.242        :param cmd: command to be executed243        :return: subprocess.Popen() process244        """245        logging.debug("cg.test(): executing parallel process '%s'", cmd)246        cmd = self._client + ' ' + cmd247        process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,248                                   stdout=subprocess.PIPE,249                                   stderr=subprocess.PIPE, close_fds=True)250        return process251    def is_cgroup(self, pid, pwd):252        """253        Checks if the 'pid' process is in 'pwd' cgroup254        :param pid: pid of the process255        :param pwd: cgroup directory256        :return: 0 when is 'pwd' member257        """258        if isinstance(pwd, int):259            pwd = self.cgroups[pwd]260        if open(os.path.join(pwd, 'tasks')).readlines().count("%d\n" % pid) > 0:261            return 0262        else:263            return -1264    def is_root_cgroup(self, pid):265        """266        Checks if the 'pid' process is in root cgroup (WO cgroup)267        :param pid: pid of the process268        :return: 0 when is 'root' member269        """270        return self.is_cgroup(pid, self.root)271    def set_cgroup(self, pid, pwd=None):272        """273        Sets cgroup membership274        :param pid: pid of the process275        :param pwd: cgroup directory276        """277        if pwd is None:278            pwd = self.root279        if isinstance(pwd, int):280            pwd = self.cgroups[pwd]281        try:282            open(os.path.join(pwd, 'tasks'), 'w').write(str(pid))283        except Exception, inst:284            raise error.TestError("cg.set_cgroup(): %s" % inst)285        if self.is_cgroup(pid, pwd):286            raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "287                                  "cgroup failed" % (pid, pwd))288    def refresh_cgroups(self):289        """290        Refresh all cgroups path.291        """292        try:293            cgroups = utils.run("lscgroup").stdout.strip()294            cgroup_list = []295            for line in cgroups.splitlines():296                controllers = line.split(":")[0]297                if set(self.module.split(",")) != set(controllers.split(",")):298                    continue299                cgroup_name = line.split(":")[-1]300                if cgroup_name != "/":301                    cgroup_list.append(cgroup_name[1:])302        except error.CmdError:303            raise error.TestFail("Get cgroup in %s failed!" % self.module)304        self.cgroups = []305        for cgroup in cgroup_list:306            pwd = self.__get_cgroup_pwd(cgroup)307            self.cgroups.append(pwd)308    def set_root_cgroup(self, pid):309        """310        Resets the cgroup membership (sets to root)311        :param pid: pid of the process312        :return: 0 when PASSED313        """314        return self.set_cgroup(pid, self.root)315    def get_property(self, prop, pwd=None):316        """317        Gets the property value318        :param prop: property name (file)319        :param pwd: cgroup directory320        :return: [] values or None when FAILED...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
