How to use put_file_list method in avocado

Best Python code snippet using avocado_python

beaker_result.py

Source:beaker_result.py Github

copy

Full Screen

...73 file = open(filename, encoding="utf-8")74 content = file.read().encode("utf-8")75 file.close()76 self.put_data(location, name, content)77 def put_file_list(self, location, prefix, filelist):78 for file in filelist:79 if os.path.isfile(file) and os.path.getsize(file) > 0:80 name = prefix + os.path.basename(file)81 self.put_file(location, name, file)82 def pre_tests(self, job):83 self.job_id = job.unique_id[:6]84 return85 def start_test(self, result, state):86 return87 def test_progress(self, progress=False):88 return89 def end_test(self, result, state):90 if self.beaker_url is None:91 return92 location = self.post_result(state)93 if location is None:94 return95 logfile = state.get("logfile")96 self.put_file(location, "logfile", logfile)97 ppstate = pprint.pformat(state).encode("utf8")98 self.put_data(location, "state", ppstate)99 pattern = os.path.join(state.get("logdir"), "*")100 filelist = [f for f in glob.glob(pattern) if f != logfile]101 self.put_file_list(location, "", filelist)102 def post_tests(self, job):103 if self.beaker_url is None:104 return105 pattern = os.path.join(job.logdir, "*")106 filelist = glob.glob(pattern)...

Full Screen

Full Screen

upload_time.py

Source:upload_time.py Github

copy

Full Screen

...7from tools.get_func_time import GetFuncTime8class UploadDownloadTime(object):9 def __init__(self):10 pass11 def put_file_list(self):12 # file_size_list = [4, 8]13 file_size_list = [4, 8, 16, 32, 64, 128, 256, 512]14 res_list = []15 time_list = []16 for file_size in file_size_list:17 res, func_time = self.put_file_one(file_size)18 res_list.append(res)19 time_list.append(func_time)20 print "\n输出"21 print "处理placement"22 for res in res_list:23 placement = res['result']['file_info']['optimizer_res']['placement']24 print sorted([i+1 for i in placement])25 print "时间开销", time_list26 def put_file_one(self, file_size):27 print "上传测试开始,file_size", file_size28 # 文件路径29 # file_size = 6430 user_name = "liuyf_test"31 cloud_file_path = "dir/test_" + str(file_size)32 local_file_path = os.path.join(CONFIG["test_file_path"], "test_" + str(file_size))33 # 配置参数34 # file_size35 storage_time = 136 jcsproxy_request_features = {37 'aliyun-beijing': 1000,38 'aliyun-shanghai': 0,39 'aliyun-shenzhen': 1000,40 }41 fault_tolerance_features = None42 target_weights = {}43 target_weights['storage_cost_weight'] = 0.544 target_weights['latency_time_weight'] = 0.545 res, func_time = GetFuncTime().get_func_time(PutFile().put_file,46 (user_name, cloud_file_path, local_file_path, True,47 storage_time, jcsproxy_request_features, fault_tolerance_features, target_weights))48 print "上传测试结束,file_size", file_size49 print res50 print func_time51 # print res['result']['file_info']['optimizer_res']['placement']52 # print res['result']['file_info']['optimizer_res']['bucket_name_list']53 # print res['result']['file_info']['optimizer_res']['jcsproxy_bucket_name_list']54 return res, func_time55if __name__ == '__main__':56 test = UploadDownloadTime()...

Full Screen

Full Screen

s3Test.py

Source:s3Test.py Github

copy

Full Screen

1#!/usr/bin/python2import time3import boto3, sys4from botocore.client import Config5import os6s3_cli = boto3.client('s3', 'sh-bt-1',7 config=Config(signature_version='s3v4'), use_ssl=False,8 endpoint_url='http://',9 aws_secret_access_key='',10 aws_access_key_id='')11def create_bucket(bucket_name):12 response = s3_cli.create_bucket(ACL='private',Bucket=bucket_name)13 if response["ResponseMetadata"]["HTTPStatusCode"] != 200:14 print("create bucket error")15def upload_file(bucket_name):16 put_file_list = os.listdir(file_path)17 for file in put_file_list:18 src_file = os.path.join("%s/" %(file_path),file)19 with open(src_file, 'rb') as data:20 s3_cli.upload_fileobj(data, bucket_name, file)21def list_objects(bucket_name):22 response = s3_cli.list_objects(Bucket="%s" %(bucket_name))23 if response["ResponseMetadata"]["HTTPStatusCode"] == 200:24 if "Contents" in response.keys():25 for s3_object in response["Contents"]:26 yield s3_object["Key"]27 else:28 raise SystemError("{0} has objects is zero".format(response["Name"]))29 else:30 raise ConnectionError("connect s3 timeout,please call s3 develop")31def delete_objects(bucket_name):32 delete_list = []33 for file_name in list_objects(bucket_name=bucket_name):34 if file_name == "version":35 pass36 else:37 delete_list.append({"Key":file_name})38 if delete_list:39 response_del = s3_cli.delete_objects(Bucket="%s" %(bucket_name),40 Delete={41 'Objects': delete_list,42 }43 )44 print("Bucket %s clean_ok" %(bucket_name))45if __name__ == '__main__':46 num=047 while True:48 file_path = '/data11/test'49 bucket_name = '001-09-12-test'50 bucket_name = bucket_name + str(num)51 num += 1 52 create_bucket(bucket_name)53 time.sleep(1)54 upload_file(bucket_name)55 #list_objects(bucket_name)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful