How to use upload_task_files method in autotest

Best Python code snippet using autotest_python

harness_beaker.py

Source:harness_beaker.py Github

copy

Full Screen

...354 return355 elif code.startswith('END'):356 if subdir in self.tests and self.tests[subdir] != '0':357 # predefined beaker task358 self.upload_task_files(self.tests[subdir], subdir)359 self.bkr_proxy.task_stop(self.tests[subdir])360 return361 else:362 if subdir in self.tests and self.tests[subdir] != '0':363 # predefine beaker tasks, will upload on END364 task_id = self.tests[subdir]365 task_upload = False366 else:367 # some random sub-task, save upload as task result368 # because there is no beaker task to add them too369 # task id was not saved in dictionary, get it from env370 if 'BEAKER_TASK_ID' not in os.environ:371 raise error.HarnessError("No BEAKER_TASK_ID set")372 task_id = os.environ['BEAKER_TASK_ID']373 task_upload = True374 bkr_status = get_beaker_code(code)375 try:376 resultid = self.bkr_proxy.task_result(task_id, bkr_status,377 subdir, 1, '')378 if task_upload:379 self.upload_result_files(task_id, resultid, subdir)380 except Exception:381 logging.critical('ERROR: Failed to process test results')382 def tear_down(self):383 '''called from complete and abort. clean up and shutdown'''384 self.kill_watchdog()385 if self.recipe_id != '0':386 self.upload_recipe_files()387 self.bkr_proxy.recipe_stop()388 os.remove(self.state_file)389 def start_watchdog(self, heartbeat):390 logging.debug('harness: Starting watchdog process, heartbeat: %d' % heartbeat)391 try:392 pid = os.fork()393 if pid == 0:394 self.watchdog_loop(heartbeat)395 else:396 self.watchdog_pid = pid397 logging.debug('harness: Watchdog process started, pid: %d', self.watchdog_pid)398 except OSError, e:399 logging.error('harness: fork in start_watchdog failed: %d (%s)\n' % (e.errno, e.strerror))400 def kill_watchdog(self):401 logging.debug('harness: Killing watchdog, pid: %d', self.watchdog_pid)402 utils.nuke_pid(self.watchdog_pid)403 self.watchdog_pid = None404 def watchdog_loop(self, heartbeat):405 while True:406 time.sleep(heartbeat)407 logging.info('[-- MARK -- %s]' % time.asctime(time.localtime(time.time())))408 sys.exit()409 def get_processed_tests(self):410 tests = {}411 if not os.path.isfile(self.state_file):412 return tests413 f = open(self.state_file, 'r')414 lines = f.readlines()415 f.close()416 for line in lines:417 subdir, t_id = line.strip().split()418 # duplicates result from multiple writers419 # once during the conversion and then again420 # during an update of a test run421 # former has task ids, latter will not422 if subdir not in tests:423 tests[subdir] = t_id424 return tests425 def write_processed_tests(self, subdir, t_id='0'):426 f = open(self.state_file, 'a')427 f.write(subdir + ' ' + t_id + '\n')428 f.close()429 def upload_recipe_files(self):430 path = self.job.resultdir431 # refresh latest executed tests432 tests = self.get_processed_tests()433 logging.debug("Recipe filtering following tests: %s" % tests)434 for root, dirnames, files in os.walk(path):435 '''do not upload previously uploaded results files'''436 for d in dirnames:437 if d in tests:438 dirnames.remove(d)439 for name in files:440 # strip full path441 remotepath = re.sub(path, "", root)442 # The localfile has the full path443 localfile = os.path.join(root, name)444 if os.path.getsize(localfile) == 0:445 continue # skip empty files446 # Upload the file447 self.bkr_proxy.recipe_upload_file(localfile, remotepath)448 def upload_task_files(self, task_id, subdir):449 path = os.path.join(self.job.resultdir, subdir)450 for root, _, files in os.walk(path):451 for name in files:452 # strip full path453 remotepath = re.sub(path, "", root)454 # The localfile has the full path455 localfile = os.path.join(root, name)456 if os.path.getsize(localfile) == 0:457 continue # skip empty files458 # Upload the file459 self.bkr_proxy.task_upload_file(task_id, localfile,460 remotepath)461 def upload_result_files(self, task_id, resultid, subdir):462 path = os.path.join(self.job.resultdir, subdir)...

Full Screen

Full Screen

azbatchstorage.py

Source:azbatchstorage.py Github

copy

Full Screen

1# Copyright (c) Microsoft Corporation2#3# All rights reserved.4#5# MIT License6#7# Permission is hereby granted, free of charge, to any person obtaining a8# copy of this software and associated documentation files (the "Software"),9# to deal in the Software without restriction, including without limitation10# the rights to use, copy, modify, merge, publish, distribute, sublicense,11# and/or sell copies of the Software, and to permit persons to whom the12# Software is furnished to do so, subject to the following conditions:13#14# The above copyright notice and this permission notice shall be included in15# all copies or substantial portions of the Software.16#17# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR18# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,19# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE20# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER21# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING22# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER23# DEALINGS IN THE SOFTWARE.24import os25from batchwrapper.azstorage import AzureStorage26class AzureBatchStorage():27 def __init__(self):28 self.storage = AzureStorage()29 self.input_container = self.storage.getDefaultInputContainer()30 self.app_container = self.storage.getDefaultAppContainer()31 self.output_container = self.storage.getDefaultOutputContainer()32 self.batch_input_files = list()33 self.batch_application_files = list()34 self.batch_task_files = list()35 self.input_files = list()36 self.app_files = list()37 self.task_files = list()38 def addTaskFilePath(self, file_path):39 exists = os.path.isfile(os.path.realpath(file_path))40 if exists:41 self.task_files.append(os.path.realpath(file_path))42 print("Currently, {} task files are in list to be uploaded".format(len(self.task_files)))43 for l in self.task_files:44 print("File: {}".format(l))45 else:46 print("File {} not found. Please check name and path".format(file_path))47 # change to upload_task_files48 def uploadTaskFiles(self):49 self.uploadApplicationFiles()50 temp = list()51 temp = [52 self.storage.createInputContainer(self.input_container, file_path)53 for file_path in self.task_files]54 self.batch_task_files.extend(temp)55 self.task_files.clear()56 #change to add_input_file_path57 def addInputFilePath(self, file_path):58 exists = os.path.isfile(os.path.realpath(file_path))59 if exists:60 self.input_files.append(os.path.realpath(file_path))61 print("Currently, {} data files are in list to be uploaded".format(len(self.input_files)))62 for l in self.input_files:63 print("File: {}".format(l))64 else:65 print("File {} not found. Please check name and path".format(file_path))66 #change to upload_input_files67 def uploadInputFiles(self):68 temp = list()69 temp = [70 self.storage.createInputContainer(self.input_container, file_path)71 for file_path in self.input_files]72 self.batch_input_files.extend(temp)73 self.input_files.clear()74 #change to app_application_file_path75 def addApplicationFilePath(self, file_path):76 exists = os.path.isfile(os.path.realpath(file_path))77 if exists:78 self.app_files.append(os.path.realpath(file_path))79 print("Currently, {} executable files are in list to be uploaded".format(len(self.app_files)))80 for l in self.app_files:81 print("File: {}".format(l))82 else:83 print("File {} not found. Please check name and path".format(file_path))84 #change to upload_application_files85 def uploadApplicationFiles(self):86 temp = list()87 temp = [88 self.storage.createInputContainer(self.app_container, file_path)89 for file_path in self.app_files]90 self.batch_application_files.extend(temp)91 #print(self.batch_application_files)92 self.app_files.clear()93 #change name to get_application_file_references94 def getApplicationFiles(self):95 if len(self.app_files) > 0:96 self.uploadApplicationFiles()97 return self.batch_application_files98 #change name to get_application_input_file_references99 def getApplicationInputFiles(self):100 if len(self.input_files) > 0:101 self.uploadInputFiles()102 return self.batch_input_files103 # change name to get_task_file_references104 def getBatchTaskFiles(self):105 if len(self.batch_task_files) > 0:106 self.uploadTaskFiles()107 return self.batch_task_files108 #change to create_output_folder109 def createOutputFolder(self):110 container_name, output_container_sas_token = self.storage.create_output_container(self.output_container)111 ### create a output.json file112 self.addApplicationFilePath("output.json")113 self.uploadApplicationFiles()114if __name__ == '__main__':115 a = AzureBatchStorage()116 a.addInputFilePath("ab.txt")117 a.addInputFilePath("a.txt")118 a.addInputFilePath("b.txt")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful