How to use prepare_disks method in autotest

Best Python code snippet using autotest_python

create_disk_images.py

Source:create_disk_images.py Github

copy

Full Screen

...14import pymongo15import settings16import kpov_util17from util import write_default_config18def get_prepare_disks(db, course_id, task_id):19 prepare_disks_source = db.prepare_disks.find_one({'course_id': course_id, 'task_id': task_id})['source']20 d = {}21 exec(compile(prepare_disks_source, 'prepare_disks.py', 'exec'), globals(), d)22 return d['prepare_disks']23def create_snapshot(course_id, task_id, student_id, computer_name, disk_name, fmt='vmdk', overwrite=True):24 # add a hash to filename to allow multiple students using the same directory25 snap_hash = hashlib.sha1((student_id+course_id).encode()).hexdigest()[:3]26 snap = '{}-{}-{}-{}.{}'.format(27 task_id, snap_hash, computer_name, disk_name, fmt)28 backing = []29 template = disk_name + '.' + fmt30 task_dir = os.path.join(student_id, course_id, task_id)31 task_path = os.path.join(settings.STUDENT_DISK_PATH, task_dir)32 if not os.path.exists(os.path.join(task_path)) or overwrite:33 if not os.path.exists(os.path.join(settings.DISK_TEMPLATE_PATH, template)):34 raise Exception('template not found: {}'.format(template))35 # ensure task dir exists36 os.makedirs(task_path, exist_ok=True)37 if fmt in ('vdi', 'vmdk'):38 # don’t use backing files, just copy the template39 os.chdir(task_path)40 if settings.STUDENT_DISK_COW:41 subprocess.call(['cp', '--reflink=always', os.path.join(settings.DISK_TEMPLATE_PATH, template), snap])42 else:43 subprocess.call(['cp', os.path.join(settings.DISK_TEMPLATE_PATH, template), snap])44 elif fmt == 'qcow2':45 # qemu-img create stores backing-file path as given, so link all46 # backing images to task directory where target image will be47 # generated48 os.chdir(settings.DISK_TEMPLATE_PATH) # qemu-img info is saner when called from image directory49 output = json.loads(subprocess.check_output(50 ['qemu-img', 'info', '--output=json', '--backing-chain', template], universal_newlines=True))51 backing_files = [i.get("backing-filename", None) for i in output]52 for image in [template] + [i for i in backing_files if i is not None]:53 backing += [image]54 dest = os.path.join(task_path, image)55 if not os.path.exists(dest):56 os.symlink(os.path.join(settings.DISK_TEMPLATE_PATH, image), dest)57 # would be great if someone finds a way to avoid the stuff above58 # make overlay image59 os.chdir(task_path)60 print(task_path, backing_files)61 subprocess.call(['qemu-img', 'create',62 '-f', fmt,63 '-b', template, snap])64 return task_dir, snap, backing65def prepare_task_disks(course_id, task_id, student_id, fmt, computers):66 disks = collections.defaultdict(dict)67 templates = collections.defaultdict(dict)68 for computer in computers:69 lock_fp.write('creating computer ' + computer['name'] + '\n')70 if not computer['disks']:71 continue72 manual_disks = []73 try_automount = False74 g = guestfs.GuestFS()75 for disk in computer['disks']:76 lock_fp.write("register " + disk['name'] + '\n')77 task_dir, snap, backing = create_snapshot(course_id, task_id, student_id, computer['name'], disk['name'], fmt=fmt)78 snap_file = os.path.join(settings.STUDENT_DISK_PATH, task_dir, snap)79 if 'options' in disk:80 g.add_drive_opts(snap_file, **(disk['options']))81 else:82 g.add_drive(snap_file)83 if 'parts' in disk:84 for p in disk['parts']:85 lock_fp.write("part {}: {}\n".format(86 settings.GUESTFS_DEV_PREFIX + p['dev'], p['path']))87 manual_disks.append(88 (settings.GUESTFS_DEV_PREFIX + p['dev'], p['path'], p.get('options', None)))89 else:90 try_automount = True91 templates[disk['name']] = g92 lock_fp.write(" templates[{}] = {}\n".format(disk['name'], disk))93 # add disk or update existing record with new format94 disks[computer['name']][disk['name']] = [snap] + backing95 g.launch()96 mounted = set()97 if try_automount:98 roots = g.inspect_os()99 for root in roots:100 mps = g.inspect_get_mountpoints(root)101 lock_fp.write('detected: ' + str(mps) + '\n')102 for mountpoint, device in sorted(mps):103 if mountpoint not in mounted:104 try:105 g.mount(device, mountpoint, )106 lock_fp.write( 'mounted ' + device + ' on ' + mountpoint + '\n')107 except RuntimeError as msg:108 lock_fp.write( "%s (ignored)\n" % msg)109 mounted.add(mountpoint)110 for device, mountpoint, opts in manual_disks:111 try:112 if opts is not None:113 g.mount_options(opts, device, mountpoint)114 else:115 g.mount(device, mountpoint)116 lock_fp.write('manually mounted ' + device + " on " + mountpoint + '\n')117 except RuntimeError as msg:118 lock_fp.write( "%s (ignored)\n" % msg)119 lock_fp.write("preparing disks\n")120 global_params = {121 'task_name': task_id,122 'course_id': course_id,123 'username': student_id124 }125 if 'TASK_URL' in vars(settings):126 global_params['task_url'] = settings.TASK_URL + '/' + course_id + '/'127 task_params = db.task_params.find_one({'course_id': course_id, 'task_id': task_id, 'student_id': student_id})['params']128 try:129 prepare_disks = get_prepare_disks(db, course_id, task_id)130 prepare_disks(templates, task_params, global_params)131 except Exception as ex:132 print("Error in prepare_disks:", ex)133 # pospravi za seboj.134 lock_fp.write("unmounting\n")135 for g in set(templates.values()):136 g.umount_all()137 g.close()138 return disks139if __name__ == '__main__':140 if len(sys.argv) != 1:141 print("Usage: {0}")142 print("Create the pending disk images")143 db = pymongo.MongoClient(settings.DB_URI).get_default_database()144 all_computers = collections.defaultdict(list)...

Full Screen

Full Screen

add_task.py

Source:add_task.py Github

copy

Full Screen

1#!/usr/bin/env python32# SPDX-License-Identifier: AGPL-3.0-or-later3import glob4import inspect5import os6import settings7import sys8import urllib9import kpov_util10import pymongo11from bson import Binary12def task_check(results, params):13 data = {14 'results': json.dumps(results),15 'params': json.dumps({k: v for k, v in params.items() if k != 'token'}),16 }17 # should be an argument to task_check, but probably better not modify the signature18 if 'token' in params:19 data['token'] = params['token']20 response = urllib.request.urlopen(21 '{task_url}/{task_name}/results.json'.format(task_url=task_url, task_name=task_name),22 data=urllib.parse.urlencode(data).encode())23 response_dict = json.loads(response.read().decode())24 hints = response_dict.get('hints', [])25 hints = ['status: ' + response_dict.get('status', '')] + hints26 return response_dict.get('result', 'No result'), hints27uploading_task_check_source = inspect.getsource(task_check)28def gen_params(user_id, meta):29 return dict()30dummy_gen_params_source = inspect.getsource(gen_params)31if __name__ == '__main__':32 if len(sys.argv) < 2:33 print("Usage: {0} <task_dir> [task_name]".format(sys.argv[0]))34 exit(1)35 dirname = sys.argv[1]36 fname = os.path.join(dirname, 'task.py')37 try:38 course_id, task_id = sys.argv[2].split('/')39 except:40 normpath = os.path.normpath(dirname)41 course_id = os.path.split(os.path.dirname(normpath))[-1]42 task_id = os.path.basename(normpath)43 print((course_id, task_id))44 db = pymongo.MongoClient(settings.DB_URI).get_default_database()45 source = open(fname).read()46 d = {}47 # defines task, task_check, gen_params, prepare_disks, computers, params_meta48 exec(compile(source, fname, 'exec'), globals(), d)49 public_meta = {}50 for k, v in d['params_meta'].items():51 if v.get('public', False):52 public_meta[k] = v53 task_source = "\n\n".join([54 inspect.getsource(d['task']),55 uploading_task_check_source,56 "params_meta = {}".format(public_meta),57 dummy_gen_params_source])58 task_check_source = inspect.getsource(d['task_check'])59 gen_params_source = inspect.getsource(d['gen_params'])60 prepare_disks_source = inspect.getsource(d['prepare_disks'])61 x = list(d['params_meta'].keys()) # check for existence62 db.computers_meta.remove({'task_id': task_id, 'course_id': course_id})63 auto_networks = set([None])64 for k, v in d['computers'].items():65 for n in v.get('network_interfaces', []):66 auto_networks.add(n.get('network', None))67 db.computers_meta.update({68 'task_id': task_id,69 'course_id': course_id, 70 'name': k71 }, {'$set': v}, upsert=True)72 auto_networks.remove(None)73 db.networks.remove({'task_id': task_id, 'course_id': course_id})74 db.task_params.remove({'task_id': task_id, 'course_id': course_id})75 db.student_computers.remove({'task_id': task_id, 'course_id': course_id})76 db.prepare_disks.remove({'task_id': task_id, 'course_id': course_id})77 try:78 net_list = d['networks'].items()79 except:80 net_list = [(k, {'public': False}) for k in auto_networks]81 for k, v in net_list:82 db.networks.update({'task_id': task_id, 'course_id': course_id, 'name': k}, {'$set': v}, upsert=True)83 db.task_checkers.update({84 'task_id': task_id, 'course_id': course_id85 }, {'$set': {'source': task_check_source}}, upsert=True)86 db.tasks.update({87 'task_id': task_id, 'course_id': course_id88 },{'$set': {'source': task_source}}, upsert=True)89 db.prepare_disks.update({90 'task_id': task_id, 'course_id': course_id91 }, {'$set': {'source': prepare_disks_source}}, upsert=True)92 db.gen_params.update({'task_id': task_id, 'course_id': course_id},93 {'$set': {'source': gen_params_source}}, upsert=True)94 db.task_params_meta.update({'task_id': task_id, 'course_id': course_id},95 {'$set': {'params': d['params_meta']}}, upsert=True)96 db.task_instructions.update({'task_id': task_id, 'course_id': course_id}, 97 {'$set': d['instructions']}, upsert=True)98 for howto_dir in glob.glob(os.path.join(dirname, 'howtos/*')):99 howto_lang = os.path.basename(os.path.normpath(howto_dir))100 if howto_lang not in {'images'}:101 with open(os.path.join(howto_dir, 'index.html')) as f:102 db.howtos.update({103 'task_id': task_id,104 'course_id': course_id,105 'lang': howto_lang},106 {'$set': {'text': f.read()}}, upsert=True)107 else:108 for img in glob.glob(os.path.join(howto_dir, '*')):109 fname = os.path.basename(img)110 with open(img, 'rb') as f:111 db.howto_images.update({112 'task_id': task_id,113 'course_id': course_id,114 'fname': fname,115 },...

Full Screen

Full Screen

diskhandler.py

Source:diskhandler.py Github

copy

Full Screen

1#!/usr/bin/python2import argparse3import os4import sys5myfolder = os.path.dirname(os.path.abspath(__file__))6lib_path = os.path.join(myfolder, "../lib/")7sys.path.append(lib_path)8import format_disk9import mount_connected_disks10import storage_structure_handler11import subprocess12import time13import threading14import prepare_and_format_blockdevice15import BlockDeviceHandler16def rpienv_source():17 if not os.path.exists(str(myfolder) + '/.rpienv'):18 print("[ ENV ERROR ] " + str(myfolder) + "/.rpienv path not exits!")19 sys.exit(1)20 command = ['bash', '-c', 'source ' + str(myfolder) + '/.rpienv -s && env']21 proc = subprocess.Popen(command, stdout = subprocess.PIPE)22 for line in proc.stdout:23 if type(line) is bytes:24 line = line.decode("utf-8")25 try:26 name = line.partition("=")[0]27 value = line.partition("=")[2]28 if type(value) is unicode:29 value = value.encode('ascii','ignore')30 value = value.rstrip()31 os.environ[name] = value32 except Exception as e:33 if "name 'unicode' is not defined" != str(e):34 print(e)35 proc.communicate()36rpienv_source()37try:38 confhandler_path = os.path.join(os.path.dirname(os.environ['CONFIGHANDLERPY']))39 sys.path.append(confhandler_path)40 import ConfigHandler41 cfg = ConfigHandler.init()42except Exception as e:43 print("Import config handler failed: " + str(e))44parser = argparse.ArgumentParser()45parser.add_argument("-s", "--search_get_edit", action='store_true', help="Search devices (/dev/sda*), get label and uuid, set fstab file")46parser.add_argument("-m", "--mount", action='store_true', help="Mount avaible devices (/media/*)")47parser.add_argument("-f", "--format_ext4", action='store_true', help="Format device for ext4 filesystem")48parser.add_argument("-l", "--listdevs", action='store_true', help="List connected devices with seversal commands")49parser.add_argument("-t", "--storage_structure", action='store_true', help="Set storage folder structure")50parser.add_argument("-w", "--show_storage_structure", action='store_true', help="Show storage folder structure")51parser.add_argument("-p", "--prepare_disks", action='store_true', help="Prepare disks whick contains diskconf.json")52parser.add_argument("-c", "--change_dev_name", action='store_true', help="Change disk label name")53args = parser.parse_args()54sge = args.search_get_edit55mount = args.mount56form = args.format_ext457listdev = args.listdevs58storage = args.storage_structure59show_storage_structure = args.show_storage_structure60prepare_disks = args.prepare_disks61change_dev_name =args.change_dev_name62def pre_check(info="CKECK"):63 state, msg = BlockDeviceHandler.is_any_device_avaible()64 if not state:65 print("{} There are no connected devices: {}".format(info, msg))66 sys.exit(444)67def main():68 if sge:69 pre_check("do_search_get_edit")70 mount_connected_disks.do_search_get_edit()71 if mount:72 pre_check("mount")73 mount_connected_disks.mount()74 if form:75 pre_check("format_disk")76 format_disk.main()77 if listdev:78 pre_check("hum_readable_list_devices")79 format_disk.hum_readable_list_devices()80 if storage:81 if str(cfg.get(section="STORAGE", option="external")).lower() == "true":82 set_extarnal_storage = True83 else:84 set_extarnal_storage = False85 external_storage_label = str(cfg.get(section="STORAGE", option="label")).rstrip()86 storage_structure_handler.create_storage_stucrure(set_extarnal_storage, external_storage_label)87 if show_storage_structure:88 if str(cfg.get(section="STORAGE", option="external")).lower() == "true":89 set_extarnal_storage = True90 else:91 set_extarnal_storage = False92 external_storage_label = str(cfg.get(section="STORAGE", option="label")).rstrip()93 text = storage_structure_handler.get_storage_structure_folders(set_extarnal_storage, external_storage_label)94 if prepare_disks:95 pre_check("prepare_block_device")96 if str(cfg.get(section="STORAGE", option="external")).lower() == "true":97 prepare_and_format_blockdevice.prepare_block_device()98 else:99 print("For automatic disk format based on diskconf.json switch STORAGE -> external True in rpitools_config.conf")100 if change_dev_name:101 pre_check("change_dev_name")102 format_disk.hum_readable_list_devices()103 device = raw_input("Select device path:\t/dev/sdaX: ")104 name = raw_input("New disk label name: ")105 mount_connected_disks.set_get_device_name(device, name)106if __name__ == "__main__":107 main()108 time.sleep(1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful