Best Python code snippet using autotest_python
bc_record.py
Source:bc_record.py  
...170        up_to = datetime.datetime.now() - datetime.timedelta(seconds=retention_seconds)171        return int(time.mktime(up_to.timetuple())*1000)172    def __get_ts(self, event_date):173        return int(time.mktime(event_date.timetuple())*1000)174    def is_cgroup(self, data_cgroup):175        if data_cgroup.startswith('/'):176            return True177        else:178            return False179    def is_docker(self, data_cpuset):180        if '/docker' in data_cpuset:181            return True182        return False183    def is_slurm(self, data_cpuset):184        if '/slurm' in data_cpuset:185            cpuset = data_cpuset.split('/')186            for fsg in cpuset:187                if fsg.startswith('job_'):188                    return (True, fsg)189        return (False, None)190    def callback_record(self, ch, method, properties, body):191        try:192            rt = json.loads(body)193            content = rt['event']194            logging.debug('Message: %s' % (content))195            if content is None or 'evt_type' not in content:196                ch.basic_ack(delivery_tag=method.delivery_tag)197                return198            if content['evt_type'] == 'fd':199                for data in content['data']:200                    is_cgroup = self.is_cgroup(data[2])201                    if is_cgroup:202                        if self.is_docker(data[2]):203                            continue204                        (is_slurm, cgroup) = self.is_slurm(data[2])205                        if is_slurm:206                            data[2] = cgroup207                    # event['in'], event['out'], event['proc'], event['name'], event['container']208                    event = {209                        'proc': int(data[0]),210                        'container': data[2],211                        'name': data[3],212                        'in': 0,213                        'out': 0,214                        'in_out': 0,215                        'start':  long(content['ts'])/1000000,216                        'ts': long(content['ts'])217                    }218                    if data[4] == 'in':219                        event['in'] = data[5]220                    else:221                        event['out'] = data[5]222                    event['in_out'] += data[5]223                    self.__add_fd(event)224            elif content['evt_type'] == 'cpu':225                for data in content['data']:226                    is_cgroup = self.is_cgroup(data[5])227                    if is_cgroup:228                        if self.is_docker(data[5]):229                            continue230                        (is_slurm, cgroup) = self.is_slurm(data[5])231                        if is_slurm:232                            data[5] = cgroup233                    is_root = 0234                    if is_cgroup and is_slurm and 'slurm_script' in data[7]:235                        is_root = 1236                    if int(data[3]) == 1:237                        is_root = 1238                    event = {239                        'cpu': int(data[0]),240                        'proc_name': data[1],...cgroup_common.py
Source:cgroup_common.py  
...75        process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,76                                   stdout=subprocess.PIPE,77                                   stderr=subprocess.PIPE, close_fds=True)78        return process79    def is_cgroup(self, pid, pwd):80        """81        Checks if the 'pid' process is in 'pwd' cgroup82        @param pid: pid of the process83        @param pwd: cgroup directory84        @return: 0 when is 'pwd' member85        """86        if open(pwd + '/tasks').readlines().count("%d\n" % pid) > 0:87            return 088        else:89            return -190    def is_root_cgroup(self, pid):91        """92        Checks if the 'pid' process is in root cgroup (WO cgroup)93        @param pid: pid of the process94        @return: 0 when is 'root' member95        """96        return self.is_cgroup(pid, self.root)97    def set_cgroup(self, pid, pwd):98        """99        Sets cgroup membership100        @param pid: pid of the process101        @param pwd: cgroup directory102        @return: 0 when PASSED103        """104        try:105            open(pwd+'/tasks', 'w').write(str(pid))106        except Exception, inst:107            logging.error("cg.set_cgroup(): %s" , inst)108            return -1109        if self.is_cgroup(pid, pwd):110            logging.error("cg.set_cgroup(): Setting %d pid into %s cgroup "111                          "failed", pid, pwd)112            return -1113        else:114            return 0115    def set_root_cgroup(self, pid):116        """117        Resets the cgroup membership (sets to root)118        @param pid: pid of the process119        @return: 0 when PASSED120        """121        return self.set_cgroup(pid, self.root)122    def get_prop(self, prop, pwd=None, supress=False):123        """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
