Best Python code snippet using autotest_python
harness_beaker.py
Source:harness_beaker.py  
...354            return355        elif code.startswith('END'):356            if subdir in self.tests and self.tests[subdir] != '0':357                # predefined beaker task358                self.upload_task_files(self.tests[subdir], subdir)359                self.bkr_proxy.task_stop(self.tests[subdir])360            return361        else:362            if subdir in self.tests and self.tests[subdir] != '0':363                # predefine beaker tasks, will upload on END364                task_id = self.tests[subdir]365                task_upload = False366            else:367                # some random sub-task, save upload as task result368                # because there is no beaker task to add them too369                # task id was not saved in dictionary, get it from env370                if 'BEAKER_TASK_ID' not in os.environ:371                    raise error.HarnessError("No BEAKER_TASK_ID set")372                task_id = os.environ['BEAKER_TASK_ID']373                task_upload = True374            bkr_status = get_beaker_code(code)375            try:376                resultid = self.bkr_proxy.task_result(task_id, bkr_status,377                                                      subdir, 1, '')378                if task_upload:379                    self.upload_result_files(task_id, resultid, subdir)380            except Exception:381                logging.critical('ERROR: Failed to process test results')382    def tear_down(self):383        '''called from complete and abort.  clean up and shutdown'''384        self.kill_watchdog()385        if self.recipe_id != '0':386            self.upload_recipe_files()387            self.bkr_proxy.recipe_stop()388        os.remove(self.state_file)389    def start_watchdog(self, heartbeat):390        logging.debug('harness: Starting watchdog process, heartbeat: %d' % heartbeat)391        try:392            pid = os.fork()393            if pid == 0:394                self.watchdog_loop(heartbeat)395            else:396                self.watchdog_pid = pid397                logging.debug('harness: Watchdog process started, pid: %d', self.watchdog_pid)398        except OSError, e:399            logging.error('harness: fork in start_watchdog failed: %d (%s)\n' % (e.errno, e.strerror))400    def kill_watchdog(self):401        logging.debug('harness: Killing watchdog, pid: %d', self.watchdog_pid)402        utils.nuke_pid(self.watchdog_pid)403        self.watchdog_pid = None404    def watchdog_loop(self, heartbeat):405        while True:406            time.sleep(heartbeat)407            logging.info('[-- MARK -- %s]' % time.asctime(time.localtime(time.time())))408        sys.exit()409    def get_processed_tests(self):410        tests = {}411        if not os.path.isfile(self.state_file):412            return tests413        f = open(self.state_file, 'r')414        lines = f.readlines()415        f.close()416        for line in lines:417            subdir, t_id = line.strip().split()418            # duplicates result from multiple writers419            # once during the conversion and then again420            # during an update of a test run421            # former has task ids, latter will not422            if subdir not in tests:423                tests[subdir] = t_id424        return tests425    def write_processed_tests(self, subdir, t_id='0'):426        f = open(self.state_file, 'a')427        f.write(subdir + ' ' + t_id + '\n')428        f.close()429    def upload_recipe_files(self):430        path = self.job.resultdir431        # refresh latest executed tests432        tests = self.get_processed_tests()433        logging.debug("Recipe filtering following tests: %s" % tests)434        for root, dirnames, files in os.walk(path):435            '''do not upload previously uploaded results files'''436            for d in dirnames:437                if d in tests:438                    dirnames.remove(d)439            for name in files:440                # strip full path441                remotepath = re.sub(path, "", root)442                # The localfile has the full path443                localfile = os.path.join(root, name)444                if os.path.getsize(localfile) == 0:445                    continue  # skip empty files446                # Upload the file447                self.bkr_proxy.recipe_upload_file(localfile, remotepath)448    def upload_task_files(self, task_id, subdir):449        path = os.path.join(self.job.resultdir, subdir)450        for root, _, files in os.walk(path):451            for name in files:452                # strip full path453                remotepath = re.sub(path, "", root)454                # The localfile has the full path455                localfile = os.path.join(root, name)456                if os.path.getsize(localfile) == 0:457                    continue  # skip empty files458                # Upload the file459                self.bkr_proxy.task_upload_file(task_id, localfile,460                                                remotepath)461    def upload_result_files(self, task_id, resultid, subdir):462        path = os.path.join(self.job.resultdir, subdir)...azbatchstorage.py
Source:azbatchstorage.py  
1# Copyright (c) Microsoft Corporation2#3# All rights reserved.4#5# MIT License6#7# Permission is hereby granted, free of charge, to any person obtaining a8# copy of this software and associated documentation files (the "Software"),9# to deal in the Software without restriction, including without limitation10# the rights to use, copy, modify, merge, publish, distribute, sublicense,11# and/or sell copies of the Software, and to permit persons to whom the12# Software is furnished to do so, subject to the following conditions:13#14# The above copyright notice and this permission notice shall be included in15# all copies or substantial portions of the Software.16#17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER23# DEALINGS IN THE SOFTWARE.24import os25from batchwrapper.azstorage import AzureStorage26class AzureBatchStorage():27    def __init__(self):28        self.storage = AzureStorage()29        self.input_container = self.storage.getDefaultInputContainer()30        self.app_container = self.storage.getDefaultAppContainer()31        self.output_container = self.storage.getDefaultOutputContainer()32        self.batch_input_files = list()33        self.batch_application_files = list()34        self.batch_task_files = list()35        self.input_files = list()36        self.app_files = list()37        self.task_files = list()38    def addTaskFilePath(self, file_path):39        exists = os.path.isfile(os.path.realpath(file_path))40        if exists:41            self.task_files.append(os.path.realpath(file_path))42            print("Currently, {} task files are in list to be uploaded".format(len(self.task_files)))43            for l in self.task_files:44                print("File: {}".format(l))45        else:46            print("File {} not found.  Please check name and path".format(file_path))47    # change to upload_task_files48    def uploadTaskFiles(self):49        self.uploadApplicationFiles()50        temp = list()51        temp = [52            self.storage.createInputContainer(self.input_container, file_path)53            for file_path in self.task_files]54        self.batch_task_files.extend(temp)55        self.task_files.clear()56    #change to add_input_file_path57    def addInputFilePath(self, file_path):58        exists = os.path.isfile(os.path.realpath(file_path))59        if exists:60            self.input_files.append(os.path.realpath(file_path))61            print("Currently, {} data files are in list to be uploaded".format(len(self.input_files)))62            for l in self.input_files:63                print("File: {}".format(l))64        else:65            print("File {} not found.  Please check name and path".format(file_path))66    #change to upload_input_files67    def uploadInputFiles(self):68        temp = list()69        temp = [70            self.storage.createInputContainer(self.input_container, file_path)71            for file_path in self.input_files]72        self.batch_input_files.extend(temp)73        self.input_files.clear()74    #change to app_application_file_path75    def addApplicationFilePath(self, file_path):76        exists = os.path.isfile(os.path.realpath(file_path))77        if exists:78            self.app_files.append(os.path.realpath(file_path))79            print("Currently, {} executable files are in list to be uploaded".format(len(self.app_files)))80            for l in self.app_files:81                print("File: {}".format(l))82        else:83            print("File {} not found.  Please check name and path".format(file_path))84    #change to upload_application_files85    def uploadApplicationFiles(self):86        temp = list()87        temp = [88            self.storage.createInputContainer(self.app_container, file_path)89            for file_path in self.app_files]90        self.batch_application_files.extend(temp)91        #print(self.batch_application_files)92        self.app_files.clear()93    #change name to get_application_file_references94    def getApplicationFiles(self):95        if len(self.app_files) > 0:96            self.uploadApplicationFiles()97        return self.batch_application_files98    #change name to get_application_input_file_references99    def getApplicationInputFiles(self):100        if len(self.input_files) > 0:101            self.uploadInputFiles()102        return self.batch_input_files103    # change name to get_task_file_references104    def getBatchTaskFiles(self):105        if len(self.batch_task_files) > 0:106            self.uploadTaskFiles()107        return self.batch_task_files108    #change to create_output_folder109    def createOutputFolder(self):110        container_name, output_container_sas_token = self.storage.create_output_container(self.output_container)111        ### create a output.json file112        self.addApplicationFilePath("output.json")113        self.uploadApplicationFiles()114if __name__ == '__main__':115    a = AzureBatchStorage()116    a.addInputFilePath("ab.txt")117    a.addInputFilePath("a.txt")118    a.addInputFilePath("b.txt")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
