Best Python code snippet using autotest_python
server_job.py
Source:server_job.py  
...280        if not self._using_parser:281            return282        final_tests = self.parser.end()283        for test in final_tests:284            self.__insert_test(test)285        self._using_parser = False286    def verify(self):287        if not self.machines:288            raise error.AutoservError('No machines specified to verify')289        if self.resultdir:290            os.chdir(self.resultdir)291        try:292            namespace = {'machines': self.machines, 'job': self,293                         'ssh_user': self._ssh_user,294                         'ssh_port': self._ssh_port,295                         'ssh_pass': self._ssh_pass}296            self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)297        except Exception as e:298            msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())299            self.record('ABORT', None, None, msg)300            raise301    def repair(self, host_protection):302        if not self.machines:303            raise error.AutoservError('No machines specified to repair')304        if self.resultdir:305            os.chdir(self.resultdir)306        namespace = {'machines': self.machines, 'job': self,307                     'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,308                     'ssh_pass': self._ssh_pass,309                     'protection_level': host_protection}310        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)311    def precheck(self):312        """313        perform any additional checks in derived classes.314        """315        pass316    def enable_external_logging(self):317        """318        Start or restart external logging mechanism.319        """320        pass321    def disable_external_logging(self):322        """323        Pause or stop external logging mechanism.324        """325        pass326    def use_external_logging(self):327        """328        Return True if external logging should be used.329        """330        return False331    def _make_parallel_wrapper(self, function, machines, log):332        """Wrap function as appropriate for calling by parallel_simple."""333        is_forking = not (len(machines) == 1 and self.machines == machines)334        if self._parse_job and is_forking and log:335            def wrapper(machine):336                self._parse_job += "/" + machine337                self._using_parser = True338                self.machines = [machine]339                self.push_execution_context(machine)340                os.chdir(self.resultdir)341                utils.write_keyval(self.resultdir, {"hostname": machine})342                self.init_parser()343                result = function(machine)344                self.cleanup_parser()345                return result346        elif len(machines) > 1 and log:347            def wrapper(machine):348                self.push_execution_context(machine)349                os.chdir(self.resultdir)350                machine_data = {'hostname': machine,351                                'status_version': str(self._STATUS_VERSION)}352                utils.write_keyval(self.resultdir, machine_data)353                result = function(machine)354                return result355        else:356            wrapper = function357        return wrapper358    def parallel_simple(self, function, machines, log=True, timeout=None,359                        return_results=False):360        """361        Run 'function' using parallel_simple, with an extra wrapper to handle362        the necessary setup for continuous parsing, if possible. If continuous363        parsing is already properly initialized then this should just work.364        :param function: A callable to run in parallel given each machine.365        :param machines: A list of machine names to be passed one per subcommand366                invocation of function.367        :param log: If True, output will be written to output in a subdirectory368                named after each machine.369        :param timeout: Seconds after which the function call should timeout.370        :param return_results: If True instead of an AutoServError being raised371                on any error a list of the results|exceptions from the function372                called on each arg is returned.  [default: False]373        :raise error.AutotestError: If any of the functions failed.374        """375        wrapper = self._make_parallel_wrapper(function, machines, log)376        return subcommand.parallel_simple(wrapper, machines,377                                          log=log, timeout=timeout,378                                          return_results=return_results)379    def parallel_on_machines(self, function, machines, timeout=None):380        """381        :param function: Called in parallel with one machine as its argument.382        :param machines: A list of machines to call function(machine) on.383        :param timeout: Seconds after which the function call should timeout.384        :return: A list of machines on which function(machine) returned385                without raising an exception.386        """387        results = self.parallel_simple(function, machines, timeout=timeout,388                                       return_results=True)389        success_machines = []390        for result, machine in itertools.izip(results, machines):391            if not isinstance(result, Exception):392                success_machines.append(machine)393        return success_machines394    _USE_TEMP_DIR = object()395    def run(self, cleanup=False, install_before=False, install_after=False,396            collect_crashdumps=True, namespace={}, control=None,397            control_file_dir=None, only_collect_crashinfo=False):398        # for a normal job, make sure the uncollected logs file exists399        # for a crashinfo-only run it should already exist, bail out otherwise400        created_uncollected_logs = False401        if self.resultdir and not os.path.exists(self._uncollected_log_file):402            if only_collect_crashinfo:403                # if this is a crashinfo-only run, and there were no existing404                # uncollected logs, just bail out early405                logging.info("No existing uncollected logs, "406                             "skipping crashinfo collection")407                return408            else:409                log_file = open(self._uncollected_log_file, "w")410                pickle.dump([], log_file)411                log_file.close()412                created_uncollected_logs = True413        # use a copy so changes don't affect the original dictionary414        namespace = namespace.copy()415        machines = self.machines416        if control is None:417            if self.control is None:418                control = ''419            else:420                control = self._load_control_file(self.control)421        if control_file_dir is None:422            control_file_dir = self.resultdir423        self.aborted = False424        namespace['machines'] = machines425        namespace['args'] = self.args426        namespace['job'] = self427        namespace['ssh_user'] = self._ssh_user428        namespace['ssh_port'] = self._ssh_port429        namespace['ssh_pass'] = self._ssh_pass430        test_start_time = int(time.time())431        if self.resultdir:432            os.chdir(self.resultdir)433            # touch status.log so that the parser knows a job is running here434            open(self.get_status_log_path(), 'a').close()435            self.enable_external_logging()436        collect_crashinfo = True437        temp_control_file_dir = None438        try:439            try:440                if install_before and machines:441                    self._execute_code(INSTALL_CONTROL_FILE, namespace)442                if only_collect_crashinfo:443                    return444                # determine the dir to write the control files to445                cfd_specified = (control_file_dir and control_file_dir is not446                                 self._USE_TEMP_DIR)447                if cfd_specified:448                    temp_control_file_dir = None449                else:450                    temp_control_file_dir = tempfile.mkdtemp(451                        suffix='temp_control_file_dir')452                    control_file_dir = temp_control_file_dir453                server_control_file = os.path.join(control_file_dir,454                                                   self._control_filename)455                client_control_file = os.path.join(control_file_dir,456                                                   CLIENT_CONTROL_FILENAME)457                if self._client:458                    namespace['control'] = control459                    utils.open_write_close(client_control_file, control)460                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,461                                    server_control_file)462                else:463                    utils.open_write_close(server_control_file, control)464                logging.info("Processing control file")465                self._execute_code(server_control_file, namespace)466                logging.info("Finished processing control file")467                # no error occurred, so we don't need to collect crashinfo468                collect_crashinfo = False469            except Exception as e:470                try:471                    logging.exception(472                        'Exception escaped control file, job aborting:')473                    self.record('INFO', None, None, str(e),474                                {'job_abort_reason': str(e)})475                except Exception:476                    pass  # don't let logging exceptions here interfere477                raise478        finally:479            if temp_control_file_dir:480                # Clean up temp directory used for copies of the control files481                try:482                    shutil.rmtree(temp_control_file_dir)483                except Exception as e:484                    logging.warn('Could not remove temp directory %s: %s',485                                 temp_control_file_dir, e)486            if machines and (collect_crashdumps or collect_crashinfo):487                namespace['test_start_time'] = test_start_time488                if collect_crashinfo:489                    # includes crashdumps490                    self._execute_code(CRASHINFO_CONTROL_FILE, namespace)491                else:492                    self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)493            if self._uncollected_log_file and created_uncollected_logs:494                os.remove(self._uncollected_log_file)495            self.disable_external_logging()496            if cleanup and machines:497                self._execute_code(CLEANUP_CONTROL_FILE, namespace)498            if install_after and machines:499                self._execute_code(INSTALL_CONTROL_FILE, namespace)500    def run_test(self, url, *args, **dargs):501        """502        Summon a test object and run it.503        tag504                tag to add to testname505        url506                url of the test to run507        """508        group, testname = self.pkgmgr.get_package_name(url, 'test')509        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)510        outputdir = self._make_test_outputdir(subdir)511        def group_func():512            try:513                test.runtest(self, url, tag, args, dargs)514            except error.TestBaseException as e:515                self.record(e.exit_status, subdir, testname, str(e))516                raise517            except Exception as e:518                info = str(e) + "\n" + traceback.format_exc()519                self.record('FAIL', subdir, testname, info)520                raise521            else:522                self.record('GOOD', subdir, testname, 'completed successfully')523        result, exc_info = self._run_group(testname, subdir, group_func)524        if exc_info and isinstance(exc_info[1], error.TestBaseException):525            return False526        elif exc_info:527            raise exc_info[0](exc_info[1])528        else:529            return True530    def _run_group(self, name, subdir, function, *args, **dargs):531        """532        Underlying method for running something inside of a group.533        """534        result, exc_info = None, None535        try:536            self.record('START', subdir, name)537            result = function(*args, **dargs)538        except error.TestBaseException as e:539            self.record("END %s" % e.exit_status, subdir, name)540            exc_info = sys.exc_info()541        except Exception as e:542            err_msg = str(e) + '\n'543            err_msg += traceback.format_exc()544            self.record('END ABORT', subdir, name, err_msg)545            raise error.JobError(name + ' failed\n' + traceback.format_exc())546        else:547            self.record('END GOOD', subdir, name)548        return result, exc_info549    def run_group(self, function, *args, **dargs):550        """551        function:552                subroutine to run553        *args:554                arguments for the function555        """556        name = function.__name__557        # Allow the tag for the group to be specified.558        tag = dargs.pop('tag', None)559        if tag:560            name = tag561        return self._run_group(name, None, function, *args, **dargs)[0]562    def run_reboot(self, reboot_func, get_kernel_func):563        """564        A specialization of run_group meant specifically for handling565        a reboot. Includes support for capturing the kernel version566        after the reboot.567        reboot_func: a function that carries out the reboot568        get_kernel_func: a function that returns a string569        representing the kernel version.570        """571        try:572            self.record('START', None, 'reboot')573            reboot_func()574        except Exception as e:575            err_msg = str(e) + '\n' + traceback.format_exc()576            self.record('END FAIL', None, 'reboot', err_msg)577            raise578        else:579            kernel = get_kernel_func()580            self.record('END GOOD', None, 'reboot',581                        optional_fields={"kernel": kernel})582    def run_control(self, path):583        """Execute a control file found at path (relative to the autotest584        path). Intended for executing a control file within a control file,585        not for running the top-level job control file."""586        path = os.path.join(self.autodir, path)587        control_file = self._load_control_file(path)588        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)589    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):590        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),591                                   on_every_test)592    def add_sysinfo_logfile(self, file, on_every_test=False):593        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)594    def _add_sysinfo_loggable(self, loggable, on_every_test):595        if on_every_test:596            self.sysinfo.test_loggables.add(loggable)597        else:598            self.sysinfo.boot_loggables.add(loggable)599    def _read_warnings(self):600        """Poll all the warning loggers and extract any new warnings that have601        been logged. If the warnings belong to a category that is currently602        disabled, this method will discard them and they will no longer be603        retrievable.604        Returns a list of (timestamp, message) tuples, where timestamp is an605        integer epoch timestamp."""606        warnings = []607        while True:608            # pull in a line of output from every logger that has609            # output ready to be read610            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)611            closed_loggers = set()612            for logger in loggers:613                line = logger.readline()614                # record any broken pipes (aka line == empty)615                if len(line) == 0:616                    closed_loggers.add(logger)617                    continue618                # parse out the warning619                timestamp, msgtype, msg = line.split('\t', 2)620                timestamp = int(timestamp)621                # if the warning is valid, add it to the results622                if self.warning_manager.is_valid(timestamp, msgtype):623                    warnings.append((timestamp, msg.strip()))624            # stop listening to loggers that are closed625            self.warning_loggers -= closed_loggers626            # stop if none of the loggers have any output left627            if not loggers:628                break629        # sort into timestamp order630        warnings.sort()631        return warnings632    def _unique_subdirectory(self, base_subdirectory_name):633        """Compute a unique results subdirectory based on the given name.634        Appends base_subdirectory_name with a number as necessary to find a635        directory name that doesn't already exist.636        """637        subdirectory = base_subdirectory_name638        counter = 1639        while os.path.exists(os.path.join(self.resultdir, subdirectory)):640            subdirectory = base_subdirectory_name + '.' + str(counter)641            counter += 1642        return subdirectory643    def get_record_context(self):644        """Returns an object representing the current job.record context.645        The object returned is an opaque object with a 0-arg restore method646        which can be called to restore the job.record context (i.e. indentation)647        to the current level. The intention is that it should be used when648        something external which generate job.record calls (e.g. an autotest649        client) can fail catastrophically and the server job record state650        needs to be reset to its original "known good" state.651        :return: A context object with a 0-arg restore() method."""652        return self._indenter.get_context()653    def record_summary(self, status_code, test_name, reason='', attributes=None,654                       distinguishing_attributes=(), child_test_ids=None):655        """Record a summary test result.656        :param status_code: status code string, see657                shared.log.is_valid_status()658        :param test_name: name of the test659        :param reason: (optional) string providing detailed reason for test660                outcome661        :param attributes: (optional) dict of string keyvals to associate with662                this result663        :param distinguishing_attributes: (optional) list of attribute names664                that should be used to distinguish identically-named test665                results.  These attributes should be present in the attributes666                parameter.  This is used to generate user-friendly subdirectory667                names.668        :param child_test_ids: (optional) list of test indices for test results669                used in generating this result.670        """671        subdirectory_name_parts = [test_name]672        for attribute in distinguishing_attributes:673            assert attributes674            assert attribute in attributes, '%s not in %s' % (attribute,675                                                              attributes)676            subdirectory_name_parts.append(attributes[attribute])677        base_subdirectory_name = '.'.join(subdirectory_name_parts)678        subdirectory = self._unique_subdirectory(base_subdirectory_name)679        subdirectory_path = os.path.join(self.resultdir, subdirectory)680        os.mkdir(subdirectory_path)681        self.record(status_code, subdirectory, test_name,682                    status=reason, optional_fields={'is_summary': True})683        if attributes:684            utils.write_keyval(subdirectory_path, attributes)685        if child_test_ids:686            ids_string = ','.join(str(test_id) for test_id in child_test_ids)687            summary_data = {'child_test_ids': ids_string}688            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),689                               summary_data)690    def disable_warnings(self, warning_type):691        self.warning_manager.disable_warnings(warning_type)692        self.record("INFO", None, None,693                    "disabling %s warnings" % warning_type,694                    {"warnings.disable": warning_type})695    def enable_warnings(self, warning_type):696        self.warning_manager.enable_warnings(warning_type)697        self.record("INFO", None, None,698                    "enabling %s warnings" % warning_type,699                    {"warnings.enable": warning_type})700    def get_status_log_path(self, subdir=None):701        """Return the path to the job status log.702        :param subdir - Optional parameter indicating that you want the path703            to a subdirectory status log.704        :return: The path where the status log should be.705        """706        if self.resultdir:707            if subdir:708                return os.path.join(self.resultdir, subdir, "status.log")709            else:710                return os.path.join(self.resultdir, "status.log")711        else:712            return None713    def _update_uncollected_logs_list(self, update_func):714        """Updates the uncollected logs list in a multi-process safe manner.715        :param update_func - a function that updates the list of uncollected716            logs. Should take one parameter, the list to be updated.717        """718        if self._uncollected_log_file:719            log_file = open(self._uncollected_log_file, "r+")720            fcntl.flock(log_file, fcntl.LOCK_EX)721        try:722            uncollected_logs = pickle.load(log_file)723            update_func(uncollected_logs)724            log_file.seek(0)725            log_file.truncate()726            pickle.dump(uncollected_logs, log_file)727            log_file.flush()728        finally:729            fcntl.flock(log_file, fcntl.LOCK_UN)730            log_file.close()731    def add_client_log(self, hostname, remote_path, local_path):732        """Adds a new set of client logs to the list of uncollected logs,733        to allow for future log recovery.734        :param host - the hostname of the machine holding the logs735        :param remote_path - the directory on the remote machine holding logs736        :param local_path - the local directory to copy the logs into737        """738        def update_func(logs_list):739            logs_list.append((hostname, remote_path, local_path))740        self._update_uncollected_logs_list(update_func)741    def remove_client_log(self, hostname, remote_path, local_path):742        """Removes a set of client logs from the list of uncollected logs,743        to allow for future log recovery.744        :param host - the hostname of the machine holding the logs745        :param remote_path - the directory on the remote machine holding logs746        :param local_path - the local directory to copy the logs into747        """748        def update_func(logs_list):749            logs_list.remove((hostname, remote_path, local_path))750        self._update_uncollected_logs_list(update_func)751    def get_client_logs(self):752        """Retrieves the list of uncollected logs, if it exists.753        :return: A list of (host, remote_path, local_path) tuples. Returns754                 an empty list if no uncollected logs file exists.755        """756        log_exists = (self._uncollected_log_file and757                      os.path.exists(self._uncollected_log_file))758        if log_exists:759            return pickle.load(open(self._uncollected_log_file))760        else:761            return []762    def _fill_server_control_namespace(self, namespace, protect=True):763        """764        Prepare a namespace to be used when executing server control files.765        This sets up the control file API by importing modules and making them766        available under the appropriate names within namespace.767        For use by _execute_code().768        Args:769          namespace: The namespace dictionary to fill in.770          protect: Boolean.  If True (the default) any operation that would771              clobber an existing entry in namespace will cause an error.772        Raises:773          error.AutoservError: When a name would be clobbered by import.774        """775        def _import_names(module_name, names=()):776            """777            Import a module and assign named attributes into namespace.778            Args:779                module_name: The string module name.780                names: A limiting list of names to import from module_name.  If781                    empty (the default), all names are imported from the module782                    similar to a "from foo.bar import *" statement.783            Raises:784                error.AutoservError: When a name being imported would clobber785                    a name already in namespace.786            """787            module = __import__(module_name, {}, {}, names)788            # No names supplied?  Import * from the lowest level module.789            # (Ugh, why do I have to implement this part myself?)790            if not names:791                for submodule_name in module_name.split('.')[1:]:792                    module = getattr(module, submodule_name)793                if hasattr(module, '__all__'):794                    names = getattr(module, '__all__')795                else:796                    names = dir(module)797            # Install each name into namespace, checking to make sure it798            # doesn't override anything that already exists.799            for name in names:800                # Check for conflicts to help prevent future problems.801                if name in namespace and protect:802                    if namespace[name] is not getattr(module, name):803                        raise error.AutoservError('importing name '804                                                  '%s from %s %r would override %r' %805                                                  (name, module_name, getattr(module, name),806                                                   namespace[name]))807                    else:808                        # Encourage cleanliness and the use of __all__ for a809                        # more concrete API with less surprises on '*' imports.810                        warnings.warn('%s (%r) being imported from %s for use '811                                      'in server control files is not the '812                                      'first occurrence of that import.' %813                                      (name, namespace[name], module_name))814                namespace[name] = getattr(module, name)815        # This is the equivalent of prepending a bunch of import statements to816        # the front of the control script.817        namespace.update(os=os, sys=sys, logging=logging)818        _import_names('autotest.server',819                      ('hosts', 'autotest_remote', 'standalone_profiler',820                       'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))821        _import_names('autotest.server.subcommand',822                      ('parallel', 'parallel_simple', 'subcommand'))823        _import_names('autotest.server.utils',824                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))825        _import_names('autotest.client.shared.error')826        _import_names('autotest.client.shared.barrier', ('barrier',))827        # Inject ourself as the job object into other classes within the API.828        # (Yuck, this injection is a gross thing be part of a public API. -gps)829        #830        # XXX Base & SiteAutotest do not appear to use .job.  Who does?831        namespace['autotest_remote'].Autotest.job = self832        # server.hosts.base_classes.Host uses .job.833        namespace['hosts'].Host.job = self834        namespace['hosts'].factory.ssh_user = self._ssh_user835        namespace['hosts'].factory.ssh_port = self._ssh_port836        namespace['hosts'].factory.ssh_pass = self._ssh_pass837    def _execute_code(self, code_file, namespace, protect=True):838        """839        Execute code using a copy of namespace as a server control script.840        Unless protect_namespace is explicitly set to False, the dict will not841        be modified.842        Args:843          code_file: The filename of the control file to execute.844          namespace: A dict containing names to make available during execution.845          protect: Boolean.  If True (the default) a copy of the namespace dict846              is used during execution to prevent the code from modifying its847              contents outside of this function.  If False the raw dict is848              passed in and modifications will be allowed.849        """850        if protect:851            namespace = namespace.copy()852        self._fill_server_control_namespace(namespace, protect=protect)853        # TODO: Simplify and get rid of the special cases for only 1 machine.854        if len(self.machines) > 1:855            machines_text = '\n'.join(self.machines) + '\n'856            # Only rewrite the file if it does not match our machine list.857            try:858                machines_f = open(MACHINES_FILENAME, 'r')859                existing_machines_text = machines_f.read()860                machines_f.close()861            except EnvironmentError:862                existing_machines_text = None863            if machines_text != existing_machines_text:864                utils.open_write_close(MACHINES_FILENAME, machines_text)865        execfile(code_file, namespace, namespace)866    def _parse_status(self, new_line):867        if not self._using_parser:868            return869        new_tests = self.parser.process_lines([new_line])870        for test in new_tests:871            self.__insert_test(test)872    def __insert_test(self, test):873        """874        An internal method to insert a new test result into the875        database. This method will not raise an exception, even if an876        error occurs during the insert, to avoid failing a test877        simply because of unexpected database issues."""878        self.num_tests_run += 1879        if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):880            self.num_tests_failed += 1881        try:882            dbutils.insert_test(self.job_model, test)883        except Exception:884            msg = ("WARNING: An unexpected error occurred while "885                   "inserting test results into the database. "886                   "Ignoring error.\n" + traceback.format_exc())...views.py
Source:views.py  
...94        obj = GFactory(self.gdrive, {'mimeType': u'application/vnd.google-apps.folder'})95        assert isinstance(obj, GFolder), 'Factory test failed on GFolder'96        obj = GFactory(self.gdrive, {'mimeType': u'application/vnd.google-apps.random'})97        assert isinstance(obj, GFile), 'Factory test failed on GFile'98    def __insert_test(self):99        file_to_insert = GSheet(self.gdrive, {'title': 'test'})100        inserted = file_to_insert.save()101        msg = 'Inserted file is not instance of GSheet as expected'102        assert isinstance(inserted, GSheet), msg103        msg = 'Inserted file title and origin title are different'104        assert inserted.title == file_to_insert.title, msg105        return inserted106    def __update_test(self, file_to_update):107        file_to_update.description = 'hello world'108        updated = file_to_update.save()109        msg = 'Updated file and origin file are different'110        assert file_to_update == updated, msg111        msg = 'Updated file description and origin description are different'112        assert file_to_update.description == updated.description, msg113        return updated114    def __delete_test(self, file_to_delete):115        deleted = file_to_delete.delete()116        msg = 'Delete test faild.'117        assert deleted == '', msg118    def __get_worksheets_test(self, f):119        assert f._worksheets is None, 'Lazy worksheets is not None'120        assert f.get_worksheets()[0].title == u'ÐиÑÑ1', 'Worksheets init faild'121        assert f._worksheets == f.get_worksheets(), 'Lazy retrieving does not work'122    def __save_worksheet_test(self, f):123        w = GWorkSheet(f, {'title': 'test1', 'col_count': 10, 'row_count': 10})124        w.save()125        assert w._id is not None, 'Add worksheet faild'126        w = GWorkSheet(f, {'title': 'test1', 'col_count': 10, 'row_count': 10})127        try:128            w.save()129            assert False, 'Two GWorkSheets with the same title can not be saved'130        except GError, e:131            pass132        # w.title = 'test2'133        # w.col_count = 12134        # w.save()135    def get(self, request):136        self.__prepare_tests(request)137        try:138            self.__factory_test()139            file_inserted = self.__insert_test()140            file_updated = self.__update_test(file_inserted)141            self.__get_worksheets_test(file_updated)142            self.__save_worksheet_test(file_updated)143            # self.__delete_test(file_updated)144        except AssertionError, error:145            self.errors += [str(error)]...test.py
Source:test.py  
...128        self.__tests = TestGroup("")129    def load(self, raw_yaml, src_path):130        data = yaml.safe_load(raw_yaml)131        for test in data:132            self.__insert_test(data[test], test, src_path.parts)133    def run(self, p, r):134        return self.__tests.run(p, r, True)135    def __insert_test(self, steps, name, groups_path):136        group = self.__tests137        for subgroup in groups_path:138            group = group.subgroup(subgroup)139        group.add(SingleTest(name, [self.__parse_step(s) for s in steps]))140    def __parse_step(self, data):141        for type_key in ["get", "compile", "run"]:142            if type_key in data:143                return (type_key, data)144    @property145    def count_total(self):146        return self.__tests.count_tests()147"""148â â ⬠â149â â â â...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
