Best Python code snippet using localstack_python
test_watchdog.py
Source:test_watchdog.py  
1import ctypes2import patroni.watchdog.linux as linuxwd3import sys4import unittest5import os6from mock import patch, Mock, PropertyMock7from patroni.watchdog import Watchdog, WatchdogError8from patroni.watchdog.base import NullWatchdog9from patroni.watchdog.linux import LinuxWatchdogDevice10class MockDevice(object):11    def __init__(self, fd, filename, flag):12        self.fd = fd13        self.filename = filename14        self.flag = flag15        self.timeout = 6016        self.open = True17        self.writes = []18mock_devices = [None]19def mock_open(filename, flag):20    fd = len(mock_devices)21    mock_devices.append(MockDevice(fd, filename, flag))22    return fd23def mock_ioctl(fd, op, arg=None, mutate_flag=False):24    assert 0 < fd < len(mock_devices)25    dev = mock_devices[fd]26    sys.stderr.write("Ioctl %d %d %r\n" % (fd, op, arg))27    if op == linuxwd.WDIOC_GETSUPPORT:28        sys.stderr.write("Get support\n")29        assert(mutate_flag is True)30        arg.options = sum(map(linuxwd.WDIOF.get, ['SETTIMEOUT', 'KEEPALIVEPING']))31        arg.identity = (ctypes.c_ubyte*32)(*map(ord, 'Mock Watchdog'))32    elif op == linuxwd.WDIOC_GETTIMEOUT:33        arg.value = dev.timeout34    elif op == linuxwd.WDIOC_SETTIMEOUT:35        sys.stderr.write("Set timeout called with %s\n" % arg.value)36        assert 0 < arg.value < 6553537        dev.timeout = arg.value - 138    else:39        raise Exception("Unknown op %d", op)40    return 041def mock_write(fd, string):42    assert 0 < fd < len(mock_devices)43    assert len(string) == 144    assert mock_devices[fd].open45    mock_devices[fd].writes.append(string)46def mock_close(fd):47    assert 0 < fd < len(mock_devices)48    assert mock_devices[fd].open49    mock_devices[fd].open = False50@unittest.skipIf(os.name == 'nt', "Windows not supported")51@patch('os.open', mock_open)52@patch('os.write', mock_write)53@patch('os.close', mock_close)54@patch('fcntl.ioctl', mock_ioctl)55class TestWatchdog(unittest.TestCase):56    def setUp(self):57        mock_devices[:] = [None]58    @patch('platform.system', Mock(return_value='Linux'))59    @patch.object(LinuxWatchdogDevice, 'can_be_disabled', PropertyMock(return_value=True))60    def test_unsafe_timeout_disable_watchdog_and_exit(self):61        watchdog = Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required', 'safety_margin': -1}})62        self.assertEqual(watchdog.activate(), False)63        self.assertEqual(watchdog.is_running, False)64    @patch('platform.system', Mock(return_value='Linux'))65    @patch.object(LinuxWatchdogDevice, 'get_timeout', Mock(return_value=16))66    def test_timeout_does_not_ensure_safe_termination(self):67        Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'auto', 'safety_margin': -1}}).activate()68        self.assertEqual(len(mock_devices), 2)69    @patch('platform.system', Mock(return_value='Linux'))70    @patch.object(Watchdog, 'is_running', PropertyMock(return_value=False))71    def test_watchdog_not_activated(self):72        self.assertFalse(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}}).activate())73    @patch('platform.system', Mock(return_value='Linux'))74    @patch.object(LinuxWatchdogDevice, 'is_running', PropertyMock(return_value=False))75    def test_watchdog_activate(self):76        with patch.object(LinuxWatchdogDevice, 'open', Mock(side_effect=WatchdogError(''))):77            self.assertTrue(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'auto'}}).activate())78        self.assertFalse(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}}).activate())79    @patch('platform.system', Mock(return_value='Linux'))80    def test_basic_operation(self):81        watchdog = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}})82        watchdog.activate()83        self.assertEqual(len(mock_devices), 2)84        device = mock_devices[-1]85        self.assertTrue(device.open)86        self.assertEqual(device.timeout, 24)87        watchdog.keepalive()88        self.assertEqual(len(device.writes), 1)89        watchdog.disable()90        self.assertFalse(device.open)91        self.assertEqual(device.writes[-1], b'V')92    def test_invalid_timings(self):93        watchdog = Watchdog({'ttl': 30, 'loop_wait': 20, 'watchdog': {'mode': 'automatic', 'safety_margin': -1}})94        watchdog.activate()95        self.assertEqual(len(mock_devices), 1)96        self.assertFalse(watchdog.is_running)97    def test_parse_mode(self):98        with patch('patroni.watchdog.base.logger.warning', new_callable=Mock()) as warning_mock:99            watchdog = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'bad'}})100            self.assertEqual(watchdog.config.mode, 'off')101            warning_mock.assert_called_once()102    @patch('platform.system', Mock(return_value='Unknown'))103    def test_unsupported_platform(self):104        self.assertRaises(SystemExit, Watchdog, {'ttl': 30, 'loop_wait': 10,105                                                 'watchdog': {'mode': 'required', 'driver': 'bad'}})106    def test_exceptions(self):107        wd = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'bad'}})108        wd.impl.close = wd.impl.keepalive = Mock(side_effect=WatchdogError(''))109        self.assertTrue(wd.activate())110        self.assertIsNone(wd.keepalive())111        self.assertIsNone(wd.disable())112    @patch('platform.system', Mock(return_value='Linux'))113    def test_config_reload(self):114        watchdog = Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})115        self.assertTrue(watchdog.activate())116        self.assertTrue(watchdog.is_running)117        watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'off'}})118        self.assertFalse(watchdog.is_running)119        watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})120        self.assertFalse(watchdog.is_running)121        watchdog.keepalive()122        self.assertTrue(watchdog.is_running)123        watchdog.disable()124        watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required', 'driver': 'unknown'}})125        self.assertFalse(watchdog.is_healthy)126        self.assertFalse(watchdog.activate())127        watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})128        self.assertFalse(watchdog.is_running)129        watchdog.keepalive()130        self.assertTrue(watchdog.is_running)131        watchdog.reload_config({'ttl': 60, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})132        watchdog.keepalive()133class TestNullWatchdog(unittest.TestCase):134    def test_basics(self):135        watchdog = NullWatchdog()136        self.assertTrue(watchdog.can_be_disabled)137        self.assertRaises(WatchdogError, watchdog.set_timeout, 1)138        self.assertEqual(watchdog.describe(), 'NullWatchdog')139        self.assertIsInstance(NullWatchdog.from_config({}), NullWatchdog)140@unittest.skipIf(os.name == 'nt', "Windows not supported")141class TestLinuxWatchdogDevice(unittest.TestCase):142    def setUp(self):143        self.impl = LinuxWatchdogDevice.from_config({})144    @patch('os.open', Mock(return_value=3))145    @patch('os.write', Mock(side_effect=OSError))146    @patch('fcntl.ioctl', Mock(return_value=0))147    def test_basics(self):148        self.impl.open()149        try:150            if self.impl.get_support().has_foo:151                self.assertFail()152        except Exception as e:153            self.assertTrue(isinstance(e, AttributeError))154        self.assertRaises(WatchdogError, self.impl.close)155        self.assertRaises(WatchdogError, self.impl.keepalive)156        self.assertRaises(WatchdogError, self.impl.set_timeout, -1)157    @patch('os.open', Mock(return_value=3))158    @patch('fcntl.ioctl', Mock(side_effect=OSError))159    def test__ioctl(self):160        self.assertRaises(WatchdogError, self.impl.get_support)161        self.impl.open()162        self.assertRaises(WatchdogError, self.impl.get_support)163    def test_is_healthy(self):164        self.assertFalse(self.impl.is_healthy)165    @patch('os.open', Mock(return_value=3))166    @patch('fcntl.ioctl', Mock(side_effect=OSError))167    def test_error_handling(self):168        self.impl.open()169        self.assertRaises(WatchdogError, self.impl.get_timeout)170        self.assertRaises(WatchdogError, self.impl.set_timeout, 10)171        # We still try to output a reasonable string even if getting info errors172        self.assertEqual(self.impl.describe(), "Linux watchdog device")173    @patch('os.open', Mock(side_effect=OSError))174    def test_open(self):...watchdog.py
Source:watchdog.py  
...197    """198    Checks if dev is Mellanox type 2 watchdog199    """200    return os.path.exists("{}/{}/timeleft".format(WD_SYSFS_PATH, dev))201def get_watchdog():202    """203    Return WatchdogType1 or WatchdogType2 based on system204    """205    watchdog_main_device_name = None206    for device in os.listdir("/dev/"):207        if device.startswith("watchdog") and is_mlnx_wd_main(device):208            watchdog_main_device_name = device209    if watchdog_main_device_name is None:210        return None211    watchdog_device_path = "/dev/{}".format(watchdog_main_device_name)212    watchdog = None213    if is_wd_type2(watchdog_main_device_name):214        watchdog = WatchdogType2(watchdog_device_path)215    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
