Best Python code snippet using lisa_python
host_plugin.py
Source:host_plugin.py  
...66    ECHO_PATH = '/host/echo'67    FACT_PATH = '/host/fact'68    PING_PATH = "/host/ping"69    SETUP_MOUNTABLE_PRIMARY_STORAGE_HEARTBEAT = "/host/mountableprimarystorageheartbeat"70    def _get_libvirt_version(self):71        ret = shell.call('libvirtd --version')72        return ret.split()[-1]73    def _get_qemu_version(self):74        ret = shell.call('%s -version' % ecsagent.get_qemu_path())75        words = ret.split()76        for w in words:77            if w == 'version':78                return words[words.index(w)+1].strip()79        raise ecsagent.KvmError('cannot get qemu version[%s]' % ret)80    @ecsagent.replyerror81    def connect(self, req):82        cmd = jsonobject.loads(req[http.REQUEST_BODY])83        self.host_uuid = cmd.hostUuid84        self.config[ecsagent.HOST_UUID] = self.host_uuid85        self.config[ecsagent.SEND_COMMAND_URL] = cmd.sendCommandUrl86        logger.debug(http.path_msg(self.CONNECT_PATH, 'host[uuid: %s] connected' % cmd.hostUuid))87        rsp = ConnectResponse()88        rsp.libvirtVersion = self.libvirt_version89        rsp.qemuVersion = self.qemu_version90        vm_plugin.cleanup_stale_vnc_iptable_chains()91        return jsonobject.dumps(rsp)92    93    @ecsagent.replyerror94    def ping(self, req):95        rsp = PingResponse()96        rsp.hostUuid = self.host_uuid97        return jsonobject.dumps(rsp)98    99    @ecsagent.replyerror100    def echo(self, req):101        logger.debug('get echoed')102        return ''103    @ecsagent.replyerror104    def fact(self, req):105        rsp = HostFactResponse()106        qemu_img_version = shell.call("qemu-img --version| grep 'qemu-img version' | cut -d ' ' -f 3")107        qemu_img_version = qemu_img_version.strip('\t\r\n ,')108        rsp.qemuImgVersion = qemu_img_version109        rsp.libvirtVersion = self.libvirt_version110        cmd = shell.ShellCmd('cat /proc/cpuinfo | grep vmx')111        cmd(False)112        if cmd.return_code == 0:113            rsp.hvmCpuFlag = 'vmx'114        if not rsp.hvmCpuFlag:115            cmd = shell.ShellCmd('cat /proc/cpuinfo | grep svm')116            cmd(False)117            if cmd.return_code == 0:118                rsp.hvmCpuFlag = 'svm'119        return jsonobject.dumps(rsp)120        121    @ecsagent.replyerror122    def capacity(self, req):123        rsp = HostCapacityResponse()124        rsp.cpuNum = linux.get_cpu_num()125        rsp.cpuSpeed = linux.get_cpu_speed()126        (used_cpu, used_memory) = vm_plugin.get_cpu_memory_used_by_running_vms()127        rsp.usedCpu = used_cpu128        rsp.totalMemory = _get_total_memory()129        rsp.usedMemory = used_memory130        ret = jsonobject.dumps(rsp)131        logger.debug('get host capacity: %s' % ret)132        return ret133    134    def _heartbeat_func(self, heartbeat_file):135        class Heartbeat(object):136            def __init__(self):137                self.current = None138        139        hb = Heartbeat()140        hb.current = time.time()141        with open(heartbeat_file, 'w') as fd:142            fd.write(jsonobject.dumps(hb))143        return True144    145    @ecsagent.replyerror146    def setup_heartbeat_file(self, req):147        cmd = jsonobject.loads(req[http.REQUEST_BODY])148        rsp = SetupMountablePrimaryStorageHeartbeatResponse()149        150        for hb in cmd.heartbeatFilePaths:151            hb_dir = os.path.dirname(hb)152            mount_path = os.path.dirname(hb_dir)153            if not linux.is_mounted(mount_path):154                rsp.error = '%s is not mounted, setup heartbeat file[%s] failed' % (mount_path, hb)155                rsp.success = False156                return jsonobject.dumps(rsp)157            158        for hb in cmd.heartbeatFilePaths:159            t = self.heartbeat_timer.get(hb, None)160            if t:161                t.cancel()162            163            hb_dir = os.path.dirname(hb)164            if not os.path.exists(hb_dir):165                os.makedirs(hb_dir, 0755)166                167            t = thread.timer(cmd.heartbeatInterval, self._heartbeat_func, args=[hb], stop_on_exception=False)168            t.start()169            self.heartbeat_timer[hb] = t170            logger.debug('create heartbeat file at[%s]' % hb)171            172        return jsonobject.dumps(rsp)173        174    def start(self):175        self.host_uuid = None176        177        http_server = ecsagent.get_http_server()178        http_server.register_sync_uri(self.CONNECT_PATH, self.connect)179        http_server.register_async_uri(self.PING_PATH, self.ping)180        http_server.register_sync_uri(self.CAPACITY_PATH, self.capacity)181        http_server.register_sync_uri(self.ECHO_PATH, self.echo)182        http_server.register_async_uri(self.SETUP_MOUNTABLE_PRIMARY_STORAGE_HEARTBEAT, self.setup_heartbeat_file)183        http_server.register_async_uri(self.FACT_PATH, self.fact)184        self.heartbeat_timer = {}185        self.libvirt_version = self._get_libvirt_version()186        self.qemu_version = self._get_qemu_version()187        filepath = r'/etc/libvirt/qemu/networks/autostart/default.xml'188        if os.path.exists(filepath):189            os.unlink(filepath)190    def stop(self):191        pass192    def configure(self, config):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
