Best Python code snippet using autotest_python
utils_cgroup.py
Source:utils_cgroup.py  
...61        """62        if not isinstance(cgroup, str):63            raise error.TestError("cgroup type isn't string!")64        return os.path.join(self.root, cgroup) + '/'65    def get_cgroup_name(self, pwd=None):66        """67        Get cgroup's name68        :param pwd: cgroup name69        :return: cgroup's name70        """71        if pwd is None:72            # root cgroup73            return None74        if isinstance(pwd, int):75            pwd = self.cgroups[pwd]76        # self.root is "/cgroup/blkio," not "/cgroup/blkio/"77        # cgroup is "/cgroup/blkio/test" or "/cgroup/blkio/test/test"78        # expected cgroup name is test/ or test/test/79        if pwd.startswith(self.root + '/'):80            return pwd[len(self.root) + 1:]81        return None82    def get_cgroup_index(self, cgroup):83        """84        Get cgroup's index in cgroups85        :param cgroup: cgroup name86        :return: index of cgroup87        """88        try:89            if self.__get_cgroup_pwd(cgroup) not in self.cgroups:90                raise error.TestFail("%s not exists!" % cgroup)91            cgroup_pwd = self.__get_cgroup_pwd(cgroup)92            return self.cgroups.index(cgroup_pwd)93        except error.CmdError:94            raise error.TestFail("Find index failed!")95    def mk_cgroup_cgcreate(self, pwd=None, cgroup=None):96        """97        Make a cgroup by executing the cgcreate command98        :params: cgroup: name of the cgroup to be created99        :return: last cgroup index100        """101        try:102            parent_cgroup = self.get_cgroup_name(pwd)103            if cgroup is None:104                range = "abcdefghijklmnopqrstuvwxyz0123456789"105                sub_cgroup = "cgroup-" + "".join(random.sample(range +106                                                 range.upper(), 6))107            else:108                sub_cgroup = cgroup109            if parent_cgroup is None:110                cgroup = sub_cgroup111            else:112                # Parent cgroup:test. Created cgroup:test1.113                # Whole cgroup name is "test/test1"114                cgroup = os.path.join(parent_cgroup, sub_cgroup)115            if self.__get_cgroup_pwd(cgroup) in self.cgroups:116                raise error.TestFail("%s exists!" % cgroup)117            cgcreate_cmd = "cgcreate -g %s:%s" % (self.module, cgroup)118            utils.run(cgcreate_cmd, ignore_status=False)119            pwd = self.__get_cgroup_pwd(cgroup)120            self.cgroups.append(pwd)121            return len(self.cgroups) - 1122        except error.CmdError:123            raise error.TestFail("Make cgroup by cgcreate failed!")124    def mk_cgroup(self, pwd=None, cgroup=None):125        """126        Creates new temporary cgroup127        :param pwd: where to create this cgroup (default: self.root)128        :param cgroup: desired cgroup name129        :return: last cgroup index130        """131        if pwd is None:132            pwd = self.root133        if isinstance(pwd, int):134            pwd = self.cgroups[pwd]135        try:136            if cgroup and self.__get_cgroup_pwd(cgroup) in self.cgroups:137                raise error.TestFail("%s exists!" % cgroup)138            if not cgroup:139                pwd = mkdtemp(prefix='cgroup-', dir=pwd) + '/'140            else:141                pwd = os.path.join(pwd, cgroup) + '/'142                if not os.path.exists(pwd):143                    os.mkdir(pwd)144        except Exception, inst:145            raise error.TestError("cg.mk_cgroup(): %s" % inst)146        self.cgroups.append(pwd)147        return len(self.cgroups) - 1148    def cgexec(self, cgroup, cmd, args=""):149        """150        Execute command in desired cgroup151        :param cgroup: Desired cgroup152        :param cmd: Executed command153        :param args: Executed command's parameters154        """155        try:156            args_str = ""157            if len(args):158                args_str = " ".join(args)159            cgexec_cmd = ("cgexec -g %s:%s %s %s" %160                         (self.module, cgroup, cmd, args_str))161            status, output = commands.getstatusoutput(cgexec_cmd)162            return status, output163        except error.CmdError, detail:164            raise error.TestFail("Execute %s in cgroup failed!\n%s" %165                                 (cmd, detail))166    def rm_cgroup(self, pwd):167        """168        Removes cgroup.169        :param pwd: cgroup directory.170        """171        if isinstance(pwd, int):172            pwd = self.cgroups[pwd]173        try:174            os.rmdir(pwd)175            self.cgroups.remove(pwd)176        except ValueError:177            logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"178                         "using this Cgroup")179        except Exception, inst:180            raise error.TestError("cg.rm_cgroup(): %s" % inst)181    def cgdelete_all_cgroups(self):182        """183        Delete all cgroups in the module184        """185        try:186            for cgroup_pwd in self.cgroups:187                # Ignore sub cgroup188                cgroup = self.get_cgroup_name(cgroup_pwd)189                if cgroup.count("/") > 0:190                    continue191                self.cgdelete_cgroup(cgroup, True)192        except error.CmdError:193            raise error.TestFail("cgdelete all cgroups in %s failed!"194                                 % self.module)195    def cgdelete_cgroup(self, cgroup, recursive=False):196        """197        Delete desired cgroup.198        :params cgroup: desired cgroup199        :params force:If true, sub cgroup can be deleted with parent cgroup200        """201        try:202            cgroup_pwd = self.__get_cgroup_pwd(cgroup)203            if cgroup_pwd not in self.cgroups:204                raise error.TestError("%s doesn't exist!" % cgroup)205            cmd = "cgdelete %s:%s" % (self.module, cgroup)206            if recursive:207                cmd += " -r"208            utils.run(cmd, ignore_status=False)209            self.cgroups.remove(cgroup_pwd)210        except error.CmdError, detail:211            raise error.TestFail("cgdelete %s failed!\n%s" %212                                 (cgroup, detail))213    def cgclassify_cgroup(self, pid, cgroup):214        """215        Classify pid into cgroup216        :param pid: pid of the process217        :param cgroup: cgroup name218        """219        try:220            cgroup_pwd = self.__get_cgroup_pwd(cgroup)221            if cgroup_pwd not in self.cgroups:222                raise error.TestError("%s doesn't exist!" % cgroup)223            cgclassify_cmd = ("cgclassify -g %s:%s %d" %224                             (self.module, cgroup, pid))225            utils.run(cgclassify_cmd, ignore_status=False)226        except error.CmdError, detail:227            raise error.TestFail("Classify process to tasks file failed!:%s" %228                                 detail)229    def get_pids(self, pwd=None):230        """231        Get all pids in cgroup232        :params: pwd: cgroup directory233        :return: all pids(list)234        """235        if pwd is None:236            pwd = self.root237        if isinstance(pwd, int):238            pwd = self.cgroups[pwd]239        try:240            return [_.strip() for _ in open(os.path.join(pwd, 'tasks'), 'r')]241        except Exception, inst:242            raise error.TestError("cg.get_pids(): %s" % inst)243    def test(self, cmd):244        """245        Executes cgroup_client.py with cmd parameter.246        :param cmd: command to be executed247        :return: subprocess.Popen() process248        """249        logging.debug("cg.test(): executing parallel process '%s'", cmd)250        cmd = self._client + ' ' + cmd251        process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,252                                   stdout=subprocess.PIPE,253                                   stderr=subprocess.PIPE, close_fds=True)254        return process255    def is_cgroup(self, pid, pwd):256        """257        Checks if the 'pid' process is in 'pwd' cgroup258        :param pid: pid of the process259        :param pwd: cgroup directory260        :return: 0 when is 'pwd' member261        """262        if isinstance(pwd, int):263            pwd = self.cgroups[pwd]264        if open(os.path.join(pwd, 'tasks')).readlines().count("%d\n" % pid) > 0:265            return 0266        else:267            return -1268    def is_root_cgroup(self, pid):269        """270        Checks if the 'pid' process is in root cgroup (WO cgroup)271        :param pid: pid of the process272        :return: 0 when is 'root' member273        """274        return self.is_cgroup(pid, self.root)275    def set_cgroup(self, pid, pwd=None):276        """277        Sets cgroup membership278        :param pid: pid of the process279        :param pwd: cgroup directory280        """281        if pwd is None:282            pwd = self.root283        if isinstance(pwd, int):284            pwd = self.cgroups[pwd]285        try:286            open(os.path.join(pwd, 'tasks'), 'w').write(str(pid))287        except Exception, inst:288            raise error.TestError("cg.set_cgroup(): %s" % inst)289        if self.is_cgroup(pid, pwd):290            raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "291                                  "cgroup failed" % (pid, pwd))292    def set_root_cgroup(self, pid):293        """294        Resets the cgroup membership (sets to root)295        :param pid: pid of the process296        :return: 0 when PASSED297        """298        return self.set_cgroup(pid, self.root)299    def get_property(self, prop, pwd=None):300        """301        Gets the property value302        :param prop: property name (file)303        :param pwd: cgroup directory304        :return: [] values or None when FAILED305        """306        if pwd is None:307            pwd = self.root308        if isinstance(pwd, int):309            pwd = self.cgroups[pwd]310        try:311            # Remove tailing '\n' from each line312            ret = [_[:-1] for _ in open(os.path.join(pwd, prop), 'r')]313            if ret:314                return ret315            else:316                return [""]317        except Exception, inst:318            raise error.TestError("cg.get_property(): %s" % inst)319    def set_property_h(self, prop, value, pwd=None, check=True, checkprop=None):320        """321        Sets the one-line property value concerning the K,M,G postfix322        :param prop: property name (file)323        :param value: desired value324        :param pwd: cgroup directory325        :param check: check the value after setup / override checking value326        :param checkprop: override prop when checking the value327        """328        _value = value329        try:330            value = str(value)331            human = {'B': 1,332                     'K': 1024,333                     'M': 1048576,334                     'G': 1073741824,335                     'T': 1099511627776336                     }337            if human.has_key(value[-1]):338                value = int(value[:-1]) * human[value[-1]]339        except Exception:340            logging.warn("cg.set_prop() fallback into cg.set_property.")341            value = _value342        self.set_property(prop, value, pwd, check, checkprop)343    def set_property(self, prop, value, pwd=None, check=True, checkprop=None):344        """345        Sets the property value346        :param prop: property name (file)347        :param value: desired value348        :param pwd: cgroup directory349        :param check: check the value after setup / override checking value350        :param checkprop: override prop when checking the value351        """352        value = str(value)353        if pwd is None:354            pwd = self.root355        if isinstance(pwd, int):356            pwd = self.cgroups[pwd]357        try:358            open(os.path.join(pwd, prop), 'w').write(value)359        except Exception, inst:360            raise error.TestError("cg.set_property(): %s" % inst)361        if check is not False:362            if check is True:363                check = value364            if checkprop is None:365                checkprop = prop366            _values = self.get_property(checkprop, pwd)367            # Sanitize non printable characters before check368            check = " ".join(check.split())369            if check not in _values:370                raise error.TestError("cg.set_property(): Setting failed: "371                                      "desired = %s, real values = %s"372                                      % (repr(check), repr(_values)))373    def cgset_property(self, prop, value, pwd=None, check=True, checkprop=None):374        """375        Sets the property value by cgset command376        :param prop: property name (file)377        :param value: desired value378        :param pwd: cgroup directory379        :param check: check the value after setup / override checking value380        :param checkprop: override prop when checking the value381        """382        if pwd is None:383            pwd = self.root384        if isinstance(pwd, int):385            pwd = self.cgroups[pwd]386        try:387            cgroup = self.get_cgroup_name(pwd)388            cgset_cmd = "cgset -r %s='%s' %s" % (prop, value, cgroup)389            utils.run(cgset_cmd, ignore_status=False)390        except error.CmdError, detail:391            raise error.TestFail("Modify %s failed!:\n%s" % (prop, detail))392        if check is not False:393            if check is True:394                check = value395            if checkprop is None:396                checkprop = prop397            _values = self.get_property(checkprop,398                                        self.get_cgroup_index(cgroup))399            # Sanitize non printable characters before check400            check = " ".join(check.split())401            if check not in _values:...mem_prof.py
Source:mem_prof.py  
...21}22CmdGetCgroupMaxMemUsage = {23'start': 'cgget -g {cgroup_name} | grep memory.max_usage_in_bytes',24}25def get_cgroup_name():26    start_string = '/run/user/20001/limitmem_'27    end_string = '_cgroup_closer'28    grep_results = os.popen(CmdGetCgroupID['start']).read()29    # print grep_results30    # find the latest cgroup name31    start_index = grep_results.rfind(start_string) + len(start_string) 32    # this task executes error. 33    if start_index == -1:34    	return "Err" 35    end_index = grep_results.find(end_string, start_index)36    if end_index == -1:37    	return "Err"38    cgroup_num = grep_results[start_index: end_index]39    return "memory:limitmem_" + cgroup_num40stop_mark = False41mem_usages = list()42max_mem_usage = 043def cgroup_polling():44    global stop_mark45    global mem_usages46    global max_mem_usage47    while 1:48    	cgroup_name = get_cgroup_name()49    	if cgroup_name == "Err":50    		continue51    	break52    print "cgroup_name: " + cgroup_name53    while 1 and (not stop_mark):54    	time.sleep(0.01)55    	memusage_results = os.popen(CmdGetCgroupMemUsage['start'].format(cgroup_name=cgroup_name)).read()56        cur_memusage = int(memusage_results.rstrip("\n").split()[1])57        # print cur_memusage58    	mem_usages.append(cur_memusage)59    	max_memusage_results = os.popen(CmdGetCgroupMaxMemUsage['start'].format(cgroup_name=cgroup_name)).read()60    	max_mem_usage = int(max_memusage_results.rstrip("\n").split()[1])61        # print max_mem_usage62def kill_keyword(task):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
