Best Python code snippet using autotest_python
harness_beaker.py
Source:harness_beaker.py  
...177            # convert recipe xml into control file178            for task in recipe.tasks:179                self.convert_task_to_control(fetchdir, control_file, task)180                # getting the task id later, will be hard, store it in file/memory181                self.write_processed_tests(self.get_test_name(task), task.id)182            control_file.close()183        except HarnessException:184            # hook to bail out on reservesys systems and not run autotest185            return None186        except Exception, ex:187            os.remove(control_file_path)188            raise error.HarnessError('beaker_harness: convert failed with -> %s' % ex)189        # autotest should find this under FETCHDIRTEST because it is unique190        return control_file_path191    def init_recipe_from_beaker(self):192        logging.debug('Contacting beaker to get task details')193        bxp = BeakerXMLParser()194        recipe_xml = self.get_recipe_from_LC()195        recipes_dict = bxp.parse_xml(recipe_xml)196        return self.find_recipe(recipes_dict)197    def init_task_params(self, task):198        logging.debug('PrepareTaskParams')199        if task is None:200            raise error.HarnessError('No valid task')201        for (name, value) in task.params.items():202            logging.debug('adding to os.environ: <%s=%s>', name, value)203            os.environ[name] = value204    def get_recipe_from_LC(self):205        logging.debug('trying to get recipe from LC:')206        try:207            recipe = self.bkr_proxy.get_recipe()208        except Exception, exc:209            raise error.HarnessError('Failed to retrieve xml: %s' % exc)210        return recipe211    def find_recipe(self, recipes_dict):212        if self.hostname in recipes_dict:213            return recipes_dict[self.hostname]214        for h in recipes_dict:215            if self.recipe_id == recipes_dict[h].id:216                return recipes_dict[h]217        raise error.HarnessError('No valid recipe for host %s' % self.hostname)218    # the block below was taken from standalone harness219    def setupInitSymlink(self):220        logging.debug('Symlinking init scripts')221        autodir = os.environ.get('AUTODIR')222        rc = os.path.join(autodir, 'tools/autotest')223        if os.path.isfile(rc) and os.path.islink(rc):224            # nothing to do225            return226        # see if system supports event.d versus inittab227        if os.path.exists('/etc/event.d'):228            # NB: assuming current runlevel is default229            initdefault = utils.system_output('/sbin/runlevel').split()[1]230        elif os.path.exists('/etc/inittab'):231            initdefault = utils.system_output('grep :initdefault: /etc/inittab')232            initdefault = initdefault.split(':')[1]233        else:234            initdefault = '2'235        try:236            utils.system('ln -sf %s /etc/init.d/autotest' % rc)237            utils.system('ln -sf %s /etc/rc%s.d/S99autotest' % (rc, initdefault))238            logging.debug('Labeling init scripts with unconfined_exec_t')239            utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/init.d/autotest')240            utils.system('chcon -h system_u:object_r:unconfined_exec_t:s0 /etc/rc%s.d/S99autotest' % initdefault)241            autotest_init = os.path.join(autodir, 'tools/autotest')242            ret = os.system('chcon system_u:object_r:unconfined_exec_t:s0 %s' % autotest_init)243            logging.debug('chcon returned <%s>', ret)244        except Exception:245            logging.warning('Linking init scripts failed')246    def get_test_name(self, task):247        name = re.sub('-', '_', task.rpmName)248        return re.sub('\.', '_', name)249    def convert_task_to_control(self, fetchdir, control, task):250        """Tasks are really just:251           # yum install $TEST252           # cd /mnt/tests/$TEST253           # make run254           Convert that into a test module with a control file255        """256        timeout = ''257        if task.timeout:258            timeout = ", timeout=%s" % task.timeout259        # python doesn't like '-' in its class names260        rpm_name = self.get_test_name(task)261        rpm_dir = fetchdir + '/' + rpm_name262        rpm_file = rpm_dir + '/' + rpm_name + '.py'263        if task.status == 'Completed' and not self.offline:264            logging.debug("SKIP Completed test %s" % rpm_name)265            return266        if task.status == 'Running' and not self.offline:267            if re.search('reservesys', task.rpmName):268                logging.debug("Found reservesys, skipping execution")269                raise HarnessException('executing under a reservesys')270            else:271                logging.warning("Found Running test %s that isn't reservesys" % task.rpmName)272        # append test name to control file273        logging.debug('adding test %s to control file' % rpm_name)274        # Trick to avoid downloading XML all the time275        # statically update each TASK_ID276        control.write("os.environ['BEAKER_TASK_ID']='%s'\n" % task.id)277        control.write("job.run_test('%s'%s)\n" % (rpm_name, timeout))278        # TODO check for git commands in task.params279        # create the test itself280        logging.debug('setting up test %s' % (rpm_file))281        if not os.path.exists(rpm_dir):282            os.mkdir(rpm_dir)283        test = open(rpm_file, 'w')284        test.write("import os\n")285        test.write("from autotest.client import test, utils\n\n")286        test.write("class %s(test.test):\n" % rpm_name)287        test.write("    version=1\n\n")288        test.write("    def initialize(self):\n")289        test.write("        utils.system('yum install -y %s')\n" % task.rpmName)290        for param in task.params:291            test.write("        os.environ['%s']='%s'\n" % (param, task.params[param]))292        test.write("    def run_once(self):\n")293        test.write("        os.chdir('%s')\n" % task.rpmPath)294        test.write("        raw_output = utils.system_output('make run', retain_output=True)\n")295        test.write("        self.results = raw_output\n")296        test.close()297    def run_start(self):298        """A run within this job is starting"""299        logging.debug('run_start')300        try:301            self.start_watchdog(BEAKER_CONSOLE_HEARTBEAT)302        except Exception:303            logging.critical('ERROR: Failed to start watchdog')304    def run_pause(self):305        """A run within this job is completing (expect continue)"""306        logging.debug('run_pause')307    def run_reboot(self):308        """A run within this job is performing a reboot309           (expect continue following reboot)310        """311        logging.debug('run_reboot')312    def run_abort(self):313        """A run within this job is aborting. It all went wrong"""314        logging.debug('run_abort')315        self.bkr_proxy.recipe_abort()316        self.tear_down()317    def run_complete(self):318        """A run within this job is completing (all done)"""319        logging.debug('run_complete')320        self.tear_down()321    def run_test_complete(self):322        """A test run by this job is complete. Note that if multiple323        tests are run in parallel, this will only be called when all324        of the parallel runs complete."""325        logging.debug('run_test_complete')326    def test_status(self, status, tag):327        """A test within this job is completing"""328        logging.debug('test_status ' + status + ' / ' + tag)329    def test_status_detail(self, code, subdir, operation, status, tag,330                           optional_fields):331        """A test within this job is completing (detail)"""332        logging.debug('test_status_detail %s / %s / %s / %s / %s / %s',333                      code, subdir, operation, status, tag, str(optional_fields))334        if not subdir:335            # recipes - covered by run_start/complete/abort336            return337        """The mapping between beaker tasks and non-beaker tasks is not easy to338           separate.  Therefore we use the START and END markers along with the339           environment variable BEAKER_TASK_ID to help us.340           We keep an on-disk-file that stores the tests we have seen (or will run341           [add by the conversion function above]).  If the test is expected, it342           will have a task id associated with it and we can communicate with beaker343           about it.  Otherwise if no 'id' is found, assume this is a sub-task that344           beaker doesn't care about and keep all the results contained to the345           beaker results directory.346        """347        if code.startswith('START'):348            if subdir in self.tests and self.tests[subdir] != '0':349                # predefined beaker task350                self.bkr_proxy.task_start(self.tests[subdir])351            else:352                # some random sub-task, save for cleanup purposes353                self.write_processed_tests(subdir)354            return355        elif code.startswith('END'):356            if subdir in self.tests and self.tests[subdir] != '0':357                # predefined beaker task358                self.upload_task_files(self.tests[subdir], subdir)359                self.bkr_proxy.task_stop(self.tests[subdir])360            return361        else:362            if subdir in self.tests and self.tests[subdir] != '0':363                # predefine beaker tasks, will upload on END364                task_id = self.tests[subdir]365                task_upload = False366            else:367                # some random sub-task, save upload as task result368                # because there is no beaker task to add them too369                # task id was not saved in dictionary, get it from env370                if 'BEAKER_TASK_ID' not in os.environ:371                    raise error.HarnessError("No BEAKER_TASK_ID set")372                task_id = os.environ['BEAKER_TASK_ID']373                task_upload = True374            bkr_status = get_beaker_code(code)375            try:376                resultid = self.bkr_proxy.task_result(task_id, bkr_status,377                                                      subdir, 1, '')378                if task_upload:379                    self.upload_result_files(task_id, resultid, subdir)380            except Exception:381                logging.critical('ERROR: Failed to process test results')382    def tear_down(self):383        '''called from complete and abort.  clean up and shutdown'''384        self.kill_watchdog()385        if self.recipe_id != '0':386            self.upload_recipe_files()387            self.bkr_proxy.recipe_stop()388        os.remove(self.state_file)389    def start_watchdog(self, heartbeat):390        logging.debug('harness: Starting watchdog process, heartbeat: %d' % heartbeat)391        try:392            pid = os.fork()393            if pid == 0:394                self.watchdog_loop(heartbeat)395            else:396                self.watchdog_pid = pid397                logging.debug('harness: Watchdog process started, pid: %d', self.watchdog_pid)398        except OSError, e:399            logging.error('harness: fork in start_watchdog failed: %d (%s)\n' % (e.errno, e.strerror))400    def kill_watchdog(self):401        logging.debug('harness: Killing watchdog, pid: %d', self.watchdog_pid)402        utils.nuke_pid(self.watchdog_pid)403        self.watchdog_pid = None404    def watchdog_loop(self, heartbeat):405        while True:406            time.sleep(heartbeat)407            logging.info('[-- MARK -- %s]' % time.asctime(time.localtime(time.time())))408        sys.exit()409    def get_processed_tests(self):410        tests = {}411        if not os.path.isfile(self.state_file):412            return tests413        f = open(self.state_file, 'r')414        lines = f.readlines()415        f.close()416        for line in lines:417            subdir, t_id = line.strip().split()418            # duplicates result from multiple writers419            # once during the conversion and then again420            # during an update of a test run421            # former has task ids, latter will not422            if subdir not in tests:423                tests[subdir] = t_id424        return tests425    def write_processed_tests(self, subdir, t_id='0'):426        f = open(self.state_file, 'a')427        f.write(subdir + ' ' + t_id + '\n')428        f.close()429    def upload_recipe_files(self):430        path = self.job.resultdir431        # refresh latest executed tests432        tests = self.get_processed_tests()433        logging.debug("Recipe filtering following tests: %s" % tests)434        for root, dirnames, files in os.walk(path):435            '''do not upload previously uploaded results files'''436            for d in dirnames:437                if d in tests:438                    dirnames.remove(d)439            for name in files:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
