Best Python code snippet using lisa_python
xdbus.py
Source:xdbus.py  
...28            self.dbusMonitor.start()29            self.oe.dbg_log('xdbus::start_service', 'exit_function', self.oe.LOGDEBUG)30        except Exception as e:31            self.oe.dbg_log('xdbus::start_service', 'ERROR: (' + repr(e) + ')', self.oe.LOGERROR)32    def stop_service(self):33        try:34            self.oe.dbg_log('xdbus::stop_service', 'enter_function', self.oe.LOGDEBUG)35            if hasattr(self, 'dbusMonitor'):36                self.dbusMonitor.stop()37                del self.dbusMonitor38            self.oe.dbg_log('xdbus::stop_service', 'exit_function', self.oe.LOGDEBUG)39        except Exception as e:40            self.oe.dbg_log('xdbus::stop_service', 'ERROR: (' + repr(e) + ')')41    def exit(self):42        pass43    def restart(self):44        try:45            self.oe.dbg_log('xdbus::restart', 'enter_function', self.oe.LOGDEBUG)46            self.stop_service()47            self.start_service()48            self.oe.dbg_log('xdbus::restart', 'exit_function', self.oe.LOGDEBUG)49        except Exception as e:50            self.oe.dbg_log('xdbus::restart', 'ERROR: (' + repr(e) + ')')51class dbusMonitor(threading.Thread):52    def __init__(self, oeMain):53        try:54            oeMain.dbg_log('xdbus::dbusMonitor::__init__', 'enter_function', oeMain.LOGDEBUG)55            self.monitors = []56            self.oe = oeMain57            self.dbusSystemBus = oeMain.dbusSystemBus58            dbus.mainloop.glib.threads_init()59            DBusGMainLoop(set_as_default=True)60            self.mainLoop = GLib.MainLoop()...dynamic.py
Source:dynamic.py  
...13        return s14    def current_services(self):15        return list(self.services)16    def after_task(self, task, service):17        self.provider.stop_service(service)18        # self.services.discard(service)19class SingleEngine(EngineBase):20    def __init__(self, provider, **kwargs):21        super(SingleEngine, self).__init__(provider, **kwargs)22        self.conf = self.provider.configurations()[0]23        self.service = None24    def before_eval(self):25        self.service = self.provider.start_service(1, self.conf)26    def after_eval(self):27        self.provider.stop_service(self.service)28        self.service = None29    def which_service(self, task):30        return self.service31    def current_services(self):32        return [self.service]33class LimitEngine(EngineBase):34    def __init__(self, provider, n, **kwargs):35        super(LimitEngine, self).__init__(provider, **kwargs)36        self.conf = self.provider.configurations()[0]37        self.n = n38        self.services = []39        self.num_unscheduled = -140        self.le_lock = Semaphore()41    def before_eval(self):42        self.num_unscheduled = len(self.dag)43    def after_eval(self):44        for s in self.services:45            self.provider.stop_service(s)46        self.services = []47    def which_service(self, task):48        self.num_unscheduled -= 149        for s in self.services:50            if len(s.tasks) == 0:51                return s52        if len(self.services) < self.n:53            s = self.provider.start_service(len(self.services) + 1, self.conf)54            self.services.append(s)55        else:56            s = min(self.services, key=lambda x: len(x.tasks))57        return s58    def after_task(self, task, service):59        self.le_lock.acquire()60        if self.num_unscheduled == 0 and len(service.tasks) == 0:61            self.provider.stop_service(service)62            for s in self.services:63                if len(s.tasks) == 0:64                    self.provider.stop_service(s)65        self.le_lock.release()66    def current_services(self):...test_webmgr.py
Source:test_webmgr.py  
1# Copyright (c) 2017-2019 David Steele <dsteele@gmail.com>2#3# SPDX-License-Identifier: GPL-2.0-or-later4# License-Filename: LICENSE5import pytest6from mock import patch, call7from comitup import webmgr8def test_webmgr_callback_target():9    assert webmgr.callback_target() == webmgr.state_callback10@pytest.fixture()11def websvc_fxt(request):12    web_svc = webmgr.web_service13    def fin():14        webmgr.web_service = web_svc15    request.addfinalizer(fin)16@pytest.mark.parametrize("state, action, fn_fact, arg_fact", (17    ('HOTSPOT',    'start', lambda: webmgr.stop_service,18                            lambda: webmgr.web_service),        # noqa19    ('HOTSPOT',     'pass', lambda: webmgr.start_service,20                            lambda: webmgr.COMITUP_SERVICE),21    ('CONNECTING', 'start', lambda: webmgr.stop_service,22                            lambda: webmgr.COMITUP_SERVICE),23    ('CONNECTED',  'start', lambda: webmgr.start_service,24                            lambda: webmgr.web_service),25))26@pytest.mark.parametrize("svc", ("", "foo"))27@patch('comitup.webmgr.start_service')28@patch('comitup.webmgr.stop_service')29def test_webmgr_callback(stop_svc, start_svc, svc, state, action,30                         fn_fact, arg_fact, websvc_fxt):31    webmgr.web_service = svc32    webmgr.state_callback(state, action)33    if arg_fact():34        assert fn_fact().called35        assert fn_fact().called_with(call(arg_fact()))36    else:37        assert not fn_fact().called38    if svc:39        assert fn_fact().called40others = [(x, y) for x in ('HOTSPOT', 'CONNECTING', 'CONNECTED')41                 for y in ('fail', 'timeout')]                      # noqa42@pytest.mark.parametrize("state, action", [43        ('CONNECTING', 'pass'),44        ('CONNECTED', 'pass'),45    ] + others46)47@patch('comitup.webmgr.start_service')48@patch('comitup.webmgr.stop_service')49def test_webmgr_no_callback(stop_svc, start_svc, state, action, websvc_fxt):50    webmgr.web_service = 'foo'51    webmgr.state_callback(state, action)52    assert not stop_svc.called...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
