Best Python code snippet using autotest_python
version_1.py
Source:version_1.py  
...87        # make sure this is a reboot line88        if self.testname != "reboot":89            return False90        # make sure this was not a failure91        if status_lib.is_worse_than_or_equal_to(current_status, "FAIL"):92            return False93        # it must have been a successful reboot94        return True95    def get_kernel(self):96        # get the base kernel version97        fields = self.optional_fields98        base = re.sub("-autotest$", "", fields.get("kernel", ""))99        # get a list of patches100        patches = []101        patch_index = 0102        while ("patch%d" % patch_index) in fields:103            patches.append(fields["patch%d" % patch_index])104            patch_index += 1105        # create a new kernel instance106        return kernel(base, patches)107    def get_timestamp(self):108        return tko_utils.get_timestamp(self.optional_fields,109                                       "timestamp")110# the default implementations from version 0 will do for now111patch = version_0.patch112class parser(base.parser):113    @staticmethod114    def make_job(dir):115        return job(dir)116    @staticmethod117    def make_dummy_abort(indent, subdir, testname, timestamp, reason):118        indent = "\t" * indent119        if not subdir:120            subdir = "----"121        if not testname:122            testname = "----"123        # There is no guarantee that this will be set.124        timestamp_field = ''125        if timestamp:126            timestamp_field = '\ttimestamp=%s' % timestamp127        msg = indent + "END ABORT\t%s\t%s%s\t%s"128        return msg % (subdir, testname, timestamp_field, reason)129    @staticmethod130    def put_back_line_and_abort(131            line_buffer, line, indent, subdir, timestamp, reason):132        logging.debug("Unexpected indent regression, aborting")133        line_buffer.put_back(line)134        abort = parser.make_dummy_abort(135            indent, subdir, subdir, timestamp, reason)136        line_buffer.put_back(abort)137    def state_iterator(self, buffer):138        line = None139        new_tests = []140        job_count, boot_count = 0, 0141        min_stack_size = 0142        stack = status_lib.status_stack()143        current_kernel = kernel("", [])  # UNKNOWN144        current_status = status_lib.statuses[-1]145        current_reason = None146        started_time_stack = [None]147        subdir_stack = [None]148        running_test = None149        running_reasons = set()150        yield []   # we're ready to start running151        # create a RUNNING SERVER_JOB entry to represent the entire test152        running_job = test.parse_partial_test(self.job, "----", "SERVER_JOB",153                                              "", current_kernel,154                                              self.job.started_time)155        new_tests.append(running_job)156        while True:157            # are we finished with parsing?158            if buffer.size() == 0 and self.finished:159                if stack.size() == 0:160                    break161                # we have status lines left on the stack,162                # we need to implicitly abort them first163                logging.debug('Unexpected end of job, aborting')164                abort_subdir_stack = list(subdir_stack)165                if self.job.aborted_by:166                    reason = "Job aborted by %s" % self.job.aborted_by167                    reason += self.job.aborted_on.strftime(168                        " at %b %d %H:%M:%S")169                else:170                    reason = "Job aborted unexpectedly"171                timestamp = line.optional_fields.get('timestamp')172                for i in reversed(xrange(stack.size())):173                    if abort_subdir_stack:174                        subdir = abort_subdir_stack.pop()175                    else:176                        subdir = None177                    abort = self.make_dummy_abort(178                        i, subdir, subdir, timestamp, reason)179                    buffer.put(abort)180            # stop processing once the buffer is empty181            if buffer.size() == 0:182                yield new_tests183                new_tests = []184                continue185            # reinitialize the per-iteration state186            started_time = None187            finished_time = None188            # get the next line189            raw_line = status_lib.clean_raw_line(buffer.get())190            logging.debug('STATUS: %s', raw_line.strip())191            line = status_line.parse_line(raw_line)192            if line is None:193                logging.debug('non-status line, ignoring')194                continue195            # do an initial sanity check of the indentation196            expected_indent = stack.size()197            if line.type == "END":198                expected_indent -= 1199            if line.indent < expected_indent:200                # ABORT the current level if indentation was unexpectedly low201                self.put_back_line_and_abort(202                    buffer, raw_line, stack.size() - 1, subdir_stack[-1],203                    line.optional_fields.get("timestamp"), line.reason)204                continue205            elif line.indent > expected_indent:206                # ignore the log if the indent was unexpectedly high207                logging.debug("unexpected extra indentation, ignoring")208                continue209            # initial line processing210            if line.type == "START":211                stack.start()212                started_time = line.get_timestamp()213                if (line.testname is None and line.subdir is None and214                        not running_test):215                    # we just started a client, all tests are relative to here216                    min_stack_size = stack.size()217                    # start a "RUNNING" CLIENT_JOB entry218                    job_name = "CLIENT_JOB.%d" % job_count219                    running_client = test.parse_partial_test(self.job, None,220                                                             job_name,221                                                             "", current_kernel,222                                                             started_time)223                    logging.debug("RUNNING: %s", running_client.status)224                    logging.debug("Testname: %s", running_client.testname)225                    new_tests.append(running_client)226                elif stack.size() == min_stack_size + 1 and not running_test:227                    # we just started a new test, insert a running record228                    running_reasons = set()229                    if line.reason:230                        running_reasons.add(line.reason)231                    running_test = test.parse_partial_test(self.job,232                                                           line.subdir,233                                                           line.testname,234                                                           line.reason,235                                                           current_kernel,236                                                           started_time)237                    logging.debug("RUNNING: %s", running_test.status)238                    logging.debug("Subdir: %s", running_test.subdir)239                    logging.debug("Testname: %s", running_test.testname)240                    logging.debug("Reason: %s", running_test.reason)241                    new_tests.append(running_test)242                started_time_stack.append(started_time)243                subdir_stack.append(line.subdir)244                continue245            elif line.type == "INFO":246                fields = line.optional_fields247                # update the current kernel if one is defined in the info248                if "kernel" in fields:249                    current_kernel = line.get_kernel()250                # update the SERVER_JOB reason if one was logged for an abort251                if "job_abort_reason" in fields:252                    running_job.reason = fields["job_abort_reason"]253                    new_tests.append(running_job)254                continue255            elif line.type == "STATUS":256                # update the stacks257                if line.subdir and stack.size() > min_stack_size:258                    subdir_stack[-1] = line.subdir259                # update the status, start and finished times260                stack.update(line.status)261                if status_lib.is_worse_than_or_equal_to(line.status,262                                                        current_status):263                    if line.reason:264                        # update the status of a currently running test265                        if running_test:266                            running_reasons.add(line.reason)267                            running_reasons = tko_utils.drop_redundant_messages(268                                running_reasons)269                            sorted_reasons = sorted(running_reasons)270                            running_test.reason = ", ".join(sorted_reasons)271                            current_reason = running_test.reason272                            new_tests.append(running_test)273                            logging.debug("update RUNNING reason: %s",274                                          line.reason)275                        else:276                            current_reason = line.reason277                    current_status = stack.current_status()278                started_time = None279                finished_time = line.get_timestamp()280                # if this is a non-test entry there's nothing else to do281                if line.testname is None and line.subdir is None:282                    continue283            elif line.type == "END":284                # grab the current subdir off of the subdir stack, or, if this285                # is the end of a job, just pop it off286                if (line.testname is None and line.subdir is None and287                        not running_test):288                    min_stack_size = stack.size() - 1289                    subdir_stack.pop()290                else:291                    line.subdir = subdir_stack.pop()292                    if not subdir_stack[-1] and stack.size() > min_stack_size:293                        subdir_stack[-1] = line.subdir294                # update the status, start and finished times295                stack.update(line.status)296                current_status = stack.end()297                if stack.size() > min_stack_size:298                    stack.update(current_status)299                    current_status = stack.current_status()300                started_time = started_time_stack.pop()301                finished_time = line.get_timestamp()302                # update the current kernel303                if line.is_successful_reboot(current_status):304                    current_kernel = line.get_kernel()305                # adjust the testname if this is a reboot306                if line.testname == "reboot" and line.subdir is None:307                    line.testname = "boot.%d" % boot_count308            else:309                assert False310            # have we just finished a test?311            if stack.size() <= min_stack_size:312                # if there was no testname, just use the subdir313                if line.testname is None:314                    line.testname = line.subdir315                # if there was no testname or subdir, use 'CLIENT_JOB'316                if line.testname is None:317                    line.testname = "CLIENT_JOB.%d" % job_count318                    running_test = running_client319                    job_count += 1320                    if not status_lib.is_worse_than_or_equal_to(321                            current_status, "ABORT"):322                        # a job hasn't really failed just because some of the323                        # tests it ran have324                        current_status = "GOOD"325                if not current_reason:326                    current_reason = line.reason327                new_test = test.parse_test(self.job,328                                           line.subdir,329                                           line.testname,330                                           current_status,331                                           current_reason,332                                           current_kernel,333                                           started_time,334                                           finished_time,...status_lib.py
Source:status_lib.py  
...5def is_worse_than(lhs, rhs):6    """ Compare two statuses and return a boolean indicating if the LHS status7    is worse than the RHS status."""8    return (statuses.index(lhs) < statuses.index(rhs))9def is_worse_than_or_equal_to(lhs, rhs):10    """ Compare two statuses and return a boolean indicating if the LHS status11    is worse than or equal to the RHS status."""12    if lhs == rhs:13        return True14    return is_worse_than(lhs, rhs)15DEFAULT_BLACKLIST = ('\r\x00',)16def clean_raw_line(raw_line, blacklist=DEFAULT_BLACKLIST):17    """Strip blacklisted characters from raw_line."""18    return re.sub('|'.join(blacklist), '', raw_line)19class status_stack(object):20    def __init__(self):21        self.status_stack = [statuses[-1]]22    def current_status(self):23        return self.status_stack[-1]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
