Best Python code snippet using autotest_python
create_disk_images.py
Source:create_disk_images.py  
...14import pymongo15import settings16import kpov_util17from util import write_default_config18def get_prepare_disks(db, course_id, task_id):19    prepare_disks_source = db.prepare_disks.find_one({'course_id': course_id, 'task_id': task_id})['source']20    d = {}21    exec(compile(prepare_disks_source, 'prepare_disks.py', 'exec'), globals(), d)22    return d['prepare_disks']23def create_snapshot(course_id, task_id, student_id, computer_name, disk_name, fmt='vmdk', overwrite=True):24    # add a hash to filename to allow multiple students using the same directory25    snap_hash = hashlib.sha1((student_id+course_id).encode()).hexdigest()[:3]26    snap = '{}-{}-{}-{}.{}'.format(27        task_id, snap_hash, computer_name, disk_name, fmt)28    backing = []29    template = disk_name + '.' + fmt30    task_dir = os.path.join(student_id, course_id, task_id)31    task_path = os.path.join(settings.STUDENT_DISK_PATH, task_dir)32    if not os.path.exists(os.path.join(task_path)) or overwrite:33        if not os.path.exists(os.path.join(settings.DISK_TEMPLATE_PATH, template)):34            raise Exception('template not found: {}'.format(template))35        # ensure task dir exists36        os.makedirs(task_path, exist_ok=True)37        if fmt in ('vdi', 'vmdk'):38            # donât use backing files, just copy the template39            os.chdir(task_path)40            if settings.STUDENT_DISK_COW:41                subprocess.call(['cp', '--reflink=always', os.path.join(settings.DISK_TEMPLATE_PATH, template), snap])42            else:43                subprocess.call(['cp', os.path.join(settings.DISK_TEMPLATE_PATH, template), snap])44        elif fmt == 'qcow2':45            # qemu-img create stores backing-file path as given, so link all46            # backing images to task directory where target image will be47            # generated48            os.chdir(settings.DISK_TEMPLATE_PATH) # qemu-img info is saner when called from image directory49            output = json.loads(subprocess.check_output(50                ['qemu-img', 'info', '--output=json', '--backing-chain', template], universal_newlines=True))51            backing_files = [i.get("backing-filename", None) for i in output]52            for image in [template] + [i for i in backing_files if i is not None]:53                backing += [image]54                dest = os.path.join(task_path, image)55                if not os.path.exists(dest):56                    os.symlink(os.path.join(settings.DISK_TEMPLATE_PATH, image), dest)57            # would be great if someone finds a way to avoid the stuff above58            # make overlay image59            os.chdir(task_path)60            print(task_path, backing_files)61            subprocess.call(['qemu-img', 'create',62                '-f', fmt,63                '-b', template, snap])64    return task_dir, snap, backing65def prepare_task_disks(course_id, task_id, student_id, fmt, computers):66    disks = collections.defaultdict(dict)67    templates = collections.defaultdict(dict)68    for computer in computers:69        lock_fp.write('creating computer ' + computer['name'] + '\n')70        if not computer['disks']:71            continue72        manual_disks = []73        try_automount = False74        g = guestfs.GuestFS()75        for disk in computer['disks']:76            lock_fp.write("register " + disk['name'] + '\n')77            task_dir, snap, backing = create_snapshot(course_id, task_id, student_id, computer['name'], disk['name'], fmt=fmt)78            snap_file = os.path.join(settings.STUDENT_DISK_PATH, task_dir, snap)79            if 'options' in disk:80                g.add_drive_opts(snap_file, **(disk['options']))81            else:82                g.add_drive(snap_file)83            if 'parts' in disk:84                for p in disk['parts']:85                    lock_fp.write("part {}: {}\n".format(86                        settings.GUESTFS_DEV_PREFIX + p['dev'], p['path']))87                    manual_disks.append(88                        (settings.GUESTFS_DEV_PREFIX + p['dev'], p['path'], p.get('options', None)))89            else:90                try_automount = True91            templates[disk['name']] = g92            lock_fp.write("  templates[{}] = {}\n".format(disk['name'], disk))93            # add disk or update existing record with new format94            disks[computer['name']][disk['name']] = [snap] + backing95        g.launch()96        mounted = set()97        if try_automount:98            roots = g.inspect_os()99            for root in roots:100                mps = g.inspect_get_mountpoints(root)101                lock_fp.write('detected: ' + str(mps) + '\n')102                for mountpoint, device in sorted(mps):103                    if mountpoint not in mounted:104                        try:105                            g.mount(device, mountpoint, )106                            lock_fp.write( 'mounted ' + device + ' on ' + mountpoint + '\n')107                        except RuntimeError as msg:108                            lock_fp.write( "%s (ignored)\n" % msg)109                        mounted.add(mountpoint)110        for device, mountpoint, opts in manual_disks:111            try:112                if opts is not None:113                    g.mount_options(opts, device, mountpoint)114                else:115                    g.mount(device, mountpoint)116                lock_fp.write('manually mounted ' + device + " on " + mountpoint + '\n')117            except RuntimeError as msg:118                lock_fp.write( "%s (ignored)\n" % msg)119    lock_fp.write("preparing disks\n")120    global_params = {121        'task_name': task_id,122        'course_id': course_id,123        'username': student_id124    }125    if 'TASK_URL' in vars(settings):126        global_params['task_url'] = settings.TASK_URL + '/' + course_id + '/'127    task_params = db.task_params.find_one({'course_id': course_id, 'task_id': task_id, 'student_id': student_id})['params']128    try:129        prepare_disks = get_prepare_disks(db, course_id, task_id)130        prepare_disks(templates, task_params, global_params)131    except Exception as ex:132        print("Error in prepare_disks:", ex)133    # pospravi za seboj.134    lock_fp.write("unmounting\n")135    for g in set(templates.values()):136        g.umount_all()137        g.close()138    return disks139if __name__ == '__main__':140    if len(sys.argv) != 1:141        print("Usage: {0}")142        print("Create the pending disk images")143    db = pymongo.MongoClient(settings.DB_URI).get_default_database()144    all_computers = collections.defaultdict(list)...add_task.py
Source:add_task.py  
1#!/usr/bin/env python32# SPDX-License-Identifier: AGPL-3.0-or-later3import glob4import inspect5import os6import settings7import sys8import urllib9import kpov_util10import pymongo11from bson import Binary12def task_check(results, params):13    data = {14        'results': json.dumps(results),15        'params': json.dumps({k: v for k, v in params.items() if k != 'token'}),16    }17    # should be an argument to task_check, but probably better not modify the signature18    if 'token' in params:19        data['token'] = params['token']20    response = urllib.request.urlopen(21        '{task_url}/{task_name}/results.json'.format(task_url=task_url, task_name=task_name),22        data=urllib.parse.urlencode(data).encode())23    response_dict = json.loads(response.read().decode())24    hints = response_dict.get('hints', [])25    hints = ['status: ' + response_dict.get('status', '')] + hints26    return response_dict.get('result', 'No result'), hints27uploading_task_check_source = inspect.getsource(task_check)28def gen_params(user_id, meta):29    return dict()30dummy_gen_params_source = inspect.getsource(gen_params)31if __name__ == '__main__':32    if len(sys.argv) < 2:33        print("Usage: {0} <task_dir> [task_name]".format(sys.argv[0]))34        exit(1)35    dirname = sys.argv[1]36    fname = os.path.join(dirname, 'task.py')37    try:38        course_id, task_id = sys.argv[2].split('/')39    except:40        normpath = os.path.normpath(dirname)41        course_id = os.path.split(os.path.dirname(normpath))[-1]42        task_id = os.path.basename(normpath)43    print((course_id, task_id))44    db = pymongo.MongoClient(settings.DB_URI).get_default_database()45    source = open(fname).read()46    d = {}47    # defines task, task_check, gen_params, prepare_disks, computers, params_meta48    exec(compile(source, fname, 'exec'), globals(), d)49    public_meta = {}50    for k, v in d['params_meta'].items():51        if v.get('public', False):52            public_meta[k] = v53    task_source = "\n\n".join([54        inspect.getsource(d['task']),55        uploading_task_check_source,56        "params_meta = {}".format(public_meta),57        dummy_gen_params_source])58    task_check_source = inspect.getsource(d['task_check'])59    gen_params_source = inspect.getsource(d['gen_params'])60    prepare_disks_source = inspect.getsource(d['prepare_disks'])61    x = list(d['params_meta'].keys()) # check for existence62    db.computers_meta.remove({'task_id': task_id, 'course_id': course_id})63    auto_networks = set([None])64    for k, v in d['computers'].items():65        for n in v.get('network_interfaces', []):66            auto_networks.add(n.get('network', None))67        db.computers_meta.update({68                'task_id': task_id,69                'course_id': course_id, 70                'name': k71            }, {'$set': v}, upsert=True)72    auto_networks.remove(None)73    db.networks.remove({'task_id': task_id, 'course_id': course_id})74    db.task_params.remove({'task_id': task_id, 'course_id': course_id})75    db.student_computers.remove({'task_id': task_id, 'course_id': course_id})76    db.prepare_disks.remove({'task_id': task_id, 'course_id': course_id})77    try:78        net_list = d['networks'].items()79    except:80        net_list = [(k, {'public': False}) for k in auto_networks]81    for k, v in net_list:82        db.networks.update({'task_id': task_id, 'course_id': course_id, 'name': k}, {'$set': v}, upsert=True)83    db.task_checkers.update({84            'task_id': task_id, 'course_id': course_id85        }, {'$set': {'source': task_check_source}}, upsert=True)86    db.tasks.update({87            'task_id': task_id, 'course_id': course_id88        },{'$set': {'source': task_source}}, upsert=True)89    db.prepare_disks.update({90            'task_id': task_id, 'course_id': course_id91        }, {'$set': {'source': prepare_disks_source}}, upsert=True)92    db.gen_params.update({'task_id': task_id, 'course_id': course_id},93        {'$set': {'source': gen_params_source}}, upsert=True)94    db.task_params_meta.update({'task_id': task_id, 'course_id': course_id},95        {'$set': {'params': d['params_meta']}}, upsert=True)96    db.task_instructions.update({'task_id': task_id, 'course_id': course_id}, 97        {'$set': d['instructions']}, upsert=True)98    for howto_dir in glob.glob(os.path.join(dirname, 'howtos/*')):99        howto_lang = os.path.basename(os.path.normpath(howto_dir))100        if howto_lang not in {'images'}:101            with open(os.path.join(howto_dir, 'index.html')) as f:102                db.howtos.update({103                        'task_id': task_id,104                        'course_id': course_id,105                        'lang': howto_lang},106                    {'$set': {'text': f.read()}}, upsert=True)107        else:108            for img in glob.glob(os.path.join(howto_dir, '*')):109                fname = os.path.basename(img)110                with open(img, 'rb') as f:111                    db.howto_images.update({112                            'task_id': task_id,113                            'course_id': course_id,114                            'fname': fname,115                        },...diskhandler.py
Source:diskhandler.py  
1#!/usr/bin/python2import argparse3import os4import sys5myfolder = os.path.dirname(os.path.abspath(__file__))6lib_path = os.path.join(myfolder, "../lib/")7sys.path.append(lib_path)8import format_disk9import mount_connected_disks10import storage_structure_handler11import subprocess12import time13import threading14import prepare_and_format_blockdevice15import BlockDeviceHandler16def rpienv_source():17    if not os.path.exists(str(myfolder) + '/.rpienv'):18        print("[ ENV ERROR ] " + str(myfolder) + "/.rpienv path not exits!")19        sys.exit(1)20    command = ['bash', '-c', 'source ' + str(myfolder) + '/.rpienv -s && env']21    proc = subprocess.Popen(command, stdout = subprocess.PIPE)22    for line in proc.stdout:23        if type(line) is bytes:24            line = line.decode("utf-8")25        try:26            name = line.partition("=")[0]27            value = line.partition("=")[2]28            if type(value) is unicode:29                value = value.encode('ascii','ignore')30            value = value.rstrip()31            os.environ[name] = value32        except Exception as e:33            if "name 'unicode' is not defined" != str(e):34                print(e)35    proc.communicate()36rpienv_source()37try:38    confhandler_path = os.path.join(os.path.dirname(os.environ['CONFIGHANDLERPY']))39    sys.path.append(confhandler_path)40    import ConfigHandler41    cfg = ConfigHandler.init()42except Exception as e:43    print("Import config handler failed: " + str(e))44parser = argparse.ArgumentParser()45parser.add_argument("-s", "--search_get_edit", action='store_true', help="Search devices (/dev/sda*), get label and uuid, set fstab file")46parser.add_argument("-m", "--mount",  action='store_true', help="Mount avaible devices (/media/*)")47parser.add_argument("-f", "--format_ext4",  action='store_true', help="Format device for ext4 filesystem")48parser.add_argument("-l", "--listdevs",  action='store_true', help="List connected devices with seversal commands")49parser.add_argument("-t", "--storage_structure",  action='store_true', help="Set storage folder structure")50parser.add_argument("-w", "--show_storage_structure",  action='store_true', help="Show storage folder structure")51parser.add_argument("-p", "--prepare_disks",  action='store_true', help="Prepare disks whick contains diskconf.json")52parser.add_argument("-c", "--change_dev_name",  action='store_true', help="Change disk label name")53args = parser.parse_args()54sge = args.search_get_edit55mount = args.mount56form = args.format_ext457listdev = args.listdevs58storage = args.storage_structure59show_storage_structure = args.show_storage_structure60prepare_disks = args.prepare_disks61change_dev_name =args.change_dev_name62def pre_check(info="CKECK"):63    state, msg = BlockDeviceHandler.is_any_device_avaible()64    if not state:65        print("{} There are no connected devices: {}".format(info, msg))66        sys.exit(444)67def main():68    if sge:69        pre_check("do_search_get_edit")70        mount_connected_disks.do_search_get_edit()71    if mount:72        pre_check("mount")73        mount_connected_disks.mount()74    if form:75        pre_check("format_disk")76        format_disk.main()77    if listdev:78        pre_check("hum_readable_list_devices")79        format_disk.hum_readable_list_devices()80    if storage:81        if str(cfg.get(section="STORAGE", option="external")).lower() == "true":82            set_extarnal_storage = True83        else:84            set_extarnal_storage = False85        external_storage_label = str(cfg.get(section="STORAGE", option="label")).rstrip()86        storage_structure_handler.create_storage_stucrure(set_extarnal_storage, external_storage_label)87    if show_storage_structure:88        if str(cfg.get(section="STORAGE", option="external")).lower() == "true":89            set_extarnal_storage = True90        else:91            set_extarnal_storage = False92        external_storage_label = str(cfg.get(section="STORAGE", option="label")).rstrip()93        text = storage_structure_handler.get_storage_structure_folders(set_extarnal_storage, external_storage_label)94    if prepare_disks:95        pre_check("prepare_block_device")96        if str(cfg.get(section="STORAGE", option="external")).lower() == "true":97            prepare_and_format_blockdevice.prepare_block_device()98        else:99            print("For automatic disk format based on diskconf.json switch STORAGE -> external True in rpitools_config.conf")100    if change_dev_name:101        pre_check("change_dev_name")102        format_disk.hum_readable_list_devices()103        device = raw_input("Select device path:\t/dev/sdaX: ")104        name = raw_input("New disk label name: ")105        mount_connected_disks.set_get_device_name(device, name)106if __name__ == "__main__":107    main()108    time.sleep(1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
