Best Python code snippet using autotest_python
libvirt_helper.py
Source:libvirt_helper.py  
1import sys2import libvirt3from xml.etree import ElementTree4import time5import ebt_system6class Libvirt(object):7    def __init__(self, uri='qemu:///system'):8        self.conn = libvirt.open(uri)9    def list_domains(self):10        return self.conn.listAllDomains()11    @staticmethod12    def get_domain_disks(domain):13        assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \14            .format('domain', __name__, sys._getframe().f_code.co_name)15        domain_xml = domain.XMLDesc(0)16        root = ElementTree.fromstring(domain_xml)17        disks = root.findall('./devices/disk')18        disks_list = list()19        for disk in disks:20            disk_info = {}21            if disk.attrib['device'] in ('disk',):22                if (disk.find('source') is not None) and (disk.find('source').get('dev') is not None):23                    disk_info['path'] = disk.find('source').get('dev')24                    disk_info['target'] = disk.find('target').get('dev')25                    disk_info['source_type'] = 'dev'26                    disk_info['snapshot_path'] = None27                    disks_list.append(disk_info)28                elif (disk.find('source') is not None) and (disk.find('source').get('file') is not None):29                    disk_info['path'] = disk.find('source').get('file')30                    disk_info['target'] = disk.find('target').get('dev')31                    disk_info['source_type'] = 'file'32                    disk_info['snapshot_path'] = None33                    disks_list.append(disk_info)34        return disks_list35    def filter_domain_list(self, domains, include=list(), exclude=list()):36        assert isinstance(include, list), '{1}.{2}: variable "{0}" has wrong type.' \37            .format('include', __name__, sys._getframe().f_code.co_name)38        assert isinstance(exclude, list), '{1}.{2}: variable "{0}" has wrong type.' \39            .format('exclude', __name__, sys._getframe().f_code.co_name)40        assert isinstance(domains, list) and isinstance(domains[0],41                                                        libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \42            .format('domains', __name__, sys._getframe().f_code.co_name)43        filtered_list = list()44        for domain in domains:45            if (domain.name() in include) or (('all' not in exclude) and (domain.name() not in exclude)):46                filtered_list.append(domain)47        return filtered_list48    @staticmethod49    def export_xml(domain, path):50        domain_xml = domain.XMLDesc(0)51        xml_file = open(path, mode='w')52        xml_file.write(domain_xml)53        xml_file.close()54    @staticmethod55    def device_size(domain, path):56        return int(domain.blockInfo(path)[0])57    def restore(self, path):58        self.conn.restore(path)59    @staticmethod60    def create_snapshot_xml(disks, memory_path=None):61        assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \62            .format('disks', __name__, sys._getframe().f_code.co_name)63        assert (memory_path is None) or isinstance(memory_path, str), '{1}.{2}: variable "{0}" has wrong type.' \64            .format('memory_path', __name__, sys._getframe().f_code.co_name)65        snap_xml = ElementTree.Element('domainsnapshot')66        disks_xml = ElementTree.SubElement(snap_xml, 'disks')67        if memory_path is None:68            ElementTree.SubElement(snap_xml, 'memory', {'snapshot': 'no'})69        else:70            ElementTree.SubElement(snap_xml, 'memory', {'snapshot': 'external', 'file': memory_path})71        for disk in disks:72            if disk['snapshot_path'] is None:73                ElementTree.SubElement(disks_xml, 'disk', {'name': disk['target'], 'snapshot': 'no'})74            else:75                disk_xml = ElementTree.SubElement(disks_xml, 'disk', {'name': disk['target'], 'snapshot': 'external'})76                ElementTree.SubElement(disk_xml, 'source', {'file': disk['snapshot_path']})77        snap_xml_str = ElementTree.tostring(snap_xml, encoding='utf8', method='xml')78        return snap_xml_str79    def create_vm_snapshot(self, domain, disks, memory_path=None, atomic=True, quiesce=False):80        assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \81            .format('domain', __name__, sys._getframe().f_code.co_name)82        assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \83            .format('disks', __name__, sys._getframe().f_code.co_name)84        assert (memory_path is None) or isinstance(memory_path, str), '{1}.{2}: variable "{0}" has wrong type.' \85            .format('memory_path', __name__, sys._getframe().f_code.co_name)86        assert isinstance(atomic, bool), '{1}.{2}: variable "{0}" has wrong type.' \87            .format('atomic', __name__, sys._getframe().f_code.co_name)88        assert isinstance(quiesce, bool), '{1}.{2}: variable "{0}" has wrong type.' \89            .format('quiesce', __name__, sys._getframe().f_code.co_name)90        flags = 091        if memory_path is None:92            flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_DISK_ONLY93        else:94            flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_LIVE95        if atomic:96            flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_ATOMIC97        if quiesce:98            flags |= libvirt.VIR_DOMAIN_SNAPSHOT_CREATE_QUIESCE99        snap_xml = self.create_snapshot_xml(disks, memory_path)100        snap = domain.snapshotCreateXML(snap_xml, flags)101        return snap102    @staticmethod103    def remove_vm_snapshot(domain, disks):104        assert isinstance(domain, libvirt.virDomain), '{1}.{2}: variable "{0}" has wrong type.' \105            .format('domain', __name__, sys._getframe().f_code.co_name)106        assert isinstance(disks, list) and isinstance(disks[0], dict), '{1}.{2}: variable "{0}" has wrong type.' \107            .format('disks', __name__, sys._getframe().f_code.co_name)108        flags = libvirt.VIR_DOMAIN_BLOCK_COMMIT_ACTIVE109        for disk in disks:110            if disk['snapshot_path'] is not None:111                domain.blockCommit(disk=disk['target'], base=None, top=None, flags=flags)112                while True:113                    status = domain.blockJobInfo(disk['target'])114                    if status['cur'] == status['end']:115                        domain.blockJobAbort(disk=disk['target'], flags=libvirt.VIR_DOMAIN_BLOCK_JOB_ABORT_PIVOT)116                        ebt_system.rm(disk['snapshot_path'])117                        break118                    else:...rl_config.py
Source:rl_config.py  
1import os2import numpy as np3from datetime import datetime4TIMESTAMP = "{0:%Y-%m-%d-Time%H-%M-%S}".format(datetime.now())5class DebugConfig:6    """7    This is parameters for experiment debug.8    """9    # Training parameters10    total_episodes = 10011    noised_episodes = 2012    max_steps = 30013    batch_size = 1024  # 25614    train_frequency = 500  # 215    # NN architecture16    ego_feature_num = 417    npc_num = 518    npc_feature_num = 419    state_size = ego_feature_num + npc_num * npc_feature_num20    action_size = 221    lra = 2e-522    lrc = 1e-423    # Fixed Q target hyper parameters24    tau = 1e-325    # exploration hyperparamters for ep. greedy. startegy26    explore_start = 0.75  # exploration probability at start27    explore_stop = 0.01  # minimum exploration probability28    explore_step = 40000  # 40k steps29    decay_rate = (explore_start - explore_stop) / explore_step  # exponential decay rate for exploration prob30    # Q LEARNING hyperparameters31    gamma = 0.99  # Discounting rate32    pretrain_length = 500  # Number of experiences stored in the Memory when initialized for the first time --INTIALLY 100k33    memory_size = 200000  # Number of experiences the Memory can keep  --INTIALLY 100k34    load_memory = False  # If True load memory, otherwise fill the memory with new data35    # ==================================================36    # output paths37    tag = 'debug'38    output_path = os.path.join('./outputs', tag, TIMESTAMP)39    memory_path = os.path.join(output_path, 'rl_replay_memory')40    # os.makedirs(memory_path, exist_ok=True)41    memory_load_path = os.path.join(memory_path, 'memory.pkl')42    memory_save_path = os.path.join(memory_path, 'memory.pkl')43    # model saving44    model_save_frequency = 2  # frequency to save the model. 0 means not to save45    model_save_frequency_no_paste = 500  # ???46    # frequency to check best models47    model_save_frequency_high_success = 1048    model_test_frequency = 1049    model_test_eps = 10  # ???50    # final model save path51    model_save_path = os.path.join(output_path, 'final_model', 'final_model.ckpt')52    # checkpoint save path53    model_ckpt_path = os.path.join(output_path, 'checkpoints')54    # best model55    best_model_path = os.path.join(output_path, 'best_models')56class hyperParameters:57    """58    Hyperparameters for RL agent59    """60    # Training parameters61    total_episodes = 1000062    noised_episodes = 200063    # todo add setter for env64    max_steps = 30065    batch_size = 1024  # 256, 512, 102466    train_frequency = 2  # 267    # td368    policy_delayed = 269    # NN architecture70    ego_feature_num = 4  # 9 for abs_all, 4 for sumo and sumo_171    npc_num = 572    npc_feature_num = 673    state_size = ego_feature_num + npc_num * npc_feature_num74    action_size = 275    # Fixed Q target hyper parameters76    tau = 1e-377    # exploration hyper-parameters for epsilon-greedy strategy78    explore_start = 0.5  # exploration probability at start79    explore_stop = 0.05  # minimum exploration probability80    explore_step = 20000  # 40k, 4000081    # decay_rate = (explore_start - explore_stop) / explore_step  # exponential decay rate for exploration prob82    # Q LEARNING hyperparameters83    gamma = 0.99  # Discounting rate84    pretrain_length = 10000  # Number of experiences stored in the Memory when initialized for the first time --INTIALLY 100k85    memory_size = 500000  # Number of experiences the Memory can keep  --INTIALLY 100k86    load_memory = False  # If True load memory, otherwise fill the memory with new data87    # ==================================================88    # output paths89    # tag = 'CarlaEnv3'90    tag = 'CarlaEnv4'91    # model saving92    model_save_frequency = 50  # frequency to save the model. 0 means not to save93    model_save_frequency_no_paste = 500  # ???94    # frequency to check best models95    model_save_frequency_high_success = 2096    model_test_frequency = 1097    model_test_eps = 10  # ???98    # ================   Decay learning rate   ================99    lra = 2e-5  # 2e-5100    lrc = 5e-5  # 1e-4101    # todo this number is determined by downsample factor102    guessing_episode_length = 200103    # decay after certain number of episodes104    decay_episodes = 1500105    decay_steps = guessing_episode_length / train_frequency * decay_episodes106    decay_rate = 1 / 2.15  # 2.15 = 10^(1/3)107    def __init__(self, args=None):108        # todo need to fix api of the rl_utils109        self.state_size = self.ego_feature_num + self.npc_num * self.npc_feature_num110        self.generate_output_path(args)111    def generate_output_path(self, args=None):112        if args:113            if args.tag:114                output_path = os.path.join('./outputs', args.route_option, self.tag, args.tag, TIMESTAMP)115            else:116                output_path = os.path.join('./outputs', args.route_option, self.tag, TIMESTAMP)117        else:118            output_path = os.path.join('./outputs/please_check', self.tag, TIMESTAMP)119        self.output_path = output_path120        self.memory_path = os.path.join(output_path, 'rl_replay_memory')121        # os.makedirs(memory_path, exist_ok=True)122        self.memory_load_path = os.path.join(self.memory_path, 'memory.pkl')123        self.memory_save_path = os.path.join(self.memory_path, 'memory.pkl')124        # checkpoint save path125        self.model_ckpt_path = os.path.join(output_path, 'checkpoints')126        # best model127        self.best_model_path = os.path.join(output_path, 'best_models')128        # final model save path...memories.py
Source:memories.py  
1import discord2from discord.ext import commands3import os4import random as rng5MEMORY_PATH = './resources/memories/'6class Memories(commands.Cog):7    def __init__(self, client):8        self.client = client9    @commands.command()10    async def memory(self, ctx, selection=None):11        memories = []12        if not selection:13            for filename in os.listdir(MEMORY_PATH):14                memories.append(f'{MEMORY_PATH}{filename}')15            await ctx.send(file=discord.File(rng.choice(memories)))16        else:17            await ctx.send(f'{MEMORY_PATH}{selection}.png')18    @commands.command(aliases=['addmemories', 'remember'])19    async def addmemory(self, ctx):20        await ctx.channel.purge(limit=1)21        img_types = ['.png', '.jpg', '.jpeg', '.gif']22        print('Attempting to save attachment...')23        if not ctx.message.attachments:24            await ctx.send('Please supply a link or attachment to save to memories.')25        else:26            for attachment in ctx.message.attachments:27                if (attachment.filename.lower().endswith(image) for image in img_types):28                    save_file = (29                        f'{MEMORY_PATH}{len(os.listdir(MEMORY_PATH))+1}.png')30                    await ctx.send('Attempting to save memory.')31                    await attachment.save(save_file)32                    await ctx.send(f'File saved as `{save_file}`.')33                else:34                    await ctx.send('Image type not supported.')35    @commands.command()36    async def memories(self, ctx):37        i = len(os.listdir(MEMORY_PATH))38        await ctx.send(f'I have {i} memories saved.')39def setup(client):40    client.add_cog(Memories(client))41    print(f'Loaded {os.path.basename(__file__)} successfully')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
