How to use watchdog method in localstack

Best Python code snippet using localstack_python

test_watchdog.py

Source:test_watchdog.py Github

copy

Full Screen

1import ctypes2import patroni.watchdog.linux as linuxwd3import sys4import unittest5import os6from mock import patch, Mock, PropertyMock7from patroni.watchdog import Watchdog, WatchdogError8from patroni.watchdog.base import NullWatchdog9from patroni.watchdog.linux import LinuxWatchdogDevice10class MockDevice(object):11 def __init__(self, fd, filename, flag):12 self.fd = fd13 self.filename = filename14 self.flag = flag15 self.timeout = 6016 self.open = True17 self.writes = []18mock_devices = [None]19def mock_open(filename, flag):20 fd = len(mock_devices)21 mock_devices.append(MockDevice(fd, filename, flag))22 return fd23def mock_ioctl(fd, op, arg=None, mutate_flag=False):24 assert 0 < fd < len(mock_devices)25 dev = mock_devices[fd]26 sys.stderr.write("Ioctl %d %d %r\n" % (fd, op, arg))27 if op == linuxwd.WDIOC_GETSUPPORT:28 sys.stderr.write("Get support\n")29 assert(mutate_flag is True)30 arg.options = sum(map(linuxwd.WDIOF.get, ['SETTIMEOUT', 'KEEPALIVEPING']))31 arg.identity = (ctypes.c_ubyte*32)(*map(ord, 'Mock Watchdog'))32 elif op == linuxwd.WDIOC_GETTIMEOUT:33 arg.value = dev.timeout34 elif op == linuxwd.WDIOC_SETTIMEOUT:35 sys.stderr.write("Set timeout called with %s\n" % arg.value)36 assert 0 < arg.value < 6553537 dev.timeout = arg.value - 138 else:39 raise Exception("Unknown op %d", op)40 return 041def mock_write(fd, string):42 assert 0 < fd < len(mock_devices)43 assert len(string) == 144 assert mock_devices[fd].open45 mock_devices[fd].writes.append(string)46def mock_close(fd):47 assert 0 < fd < len(mock_devices)48 assert mock_devices[fd].open49 mock_devices[fd].open = False50@unittest.skipIf(os.name == 'nt', "Windows not supported")51@patch('os.open', mock_open)52@patch('os.write', mock_write)53@patch('os.close', mock_close)54@patch('fcntl.ioctl', mock_ioctl)55class TestWatchdog(unittest.TestCase):56 def setUp(self):57 mock_devices[:] = [None]58 @patch('platform.system', Mock(return_value='Linux'))59 @patch.object(LinuxWatchdogDevice, 'can_be_disabled', PropertyMock(return_value=True))60 def test_unsafe_timeout_disable_watchdog_and_exit(self):61 watchdog = Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required', 'safety_margin': -1}})62 self.assertEqual(watchdog.activate(), False)63 self.assertEqual(watchdog.is_running, False)64 @patch('platform.system', Mock(return_value='Linux'))65 @patch.object(LinuxWatchdogDevice, 'get_timeout', Mock(return_value=16))66 def test_timeout_does_not_ensure_safe_termination(self):67 Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'auto', 'safety_margin': -1}}).activate()68 self.assertEqual(len(mock_devices), 2)69 @patch('platform.system', Mock(return_value='Linux'))70 @patch.object(Watchdog, 'is_running', PropertyMock(return_value=False))71 def test_watchdog_not_activated(self):72 self.assertFalse(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}}).activate())73 @patch('platform.system', Mock(return_value='Linux'))74 @patch.object(LinuxWatchdogDevice, 'is_running', PropertyMock(return_value=False))75 def test_watchdog_activate(self):76 with patch.object(LinuxWatchdogDevice, 'open', Mock(side_effect=WatchdogError(''))):77 self.assertTrue(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'auto'}}).activate())78 self.assertFalse(Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}}).activate())79 @patch('platform.system', Mock(return_value='Linux'))80 def test_basic_operation(self):81 watchdog = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'required'}})82 watchdog.activate()83 self.assertEqual(len(mock_devices), 2)84 device = mock_devices[-1]85 self.assertTrue(device.open)86 self.assertEqual(device.timeout, 24)87 watchdog.keepalive()88 self.assertEqual(len(device.writes), 1)89 watchdog.disable()90 self.assertFalse(device.open)91 self.assertEqual(device.writes[-1], b'V')92 def test_invalid_timings(self):93 watchdog = Watchdog({'ttl': 30, 'loop_wait': 20, 'watchdog': {'mode': 'automatic', 'safety_margin': -1}})94 watchdog.activate()95 self.assertEqual(len(mock_devices), 1)96 self.assertFalse(watchdog.is_running)97 def test_parse_mode(self):98 with patch('patroni.watchdog.base.logger.warning', new_callable=Mock()) as warning_mock:99 watchdog = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'bad'}})100 self.assertEqual(watchdog.config.mode, 'off')101 warning_mock.assert_called_once()102 @patch('platform.system', Mock(return_value='Unknown'))103 def test_unsupported_platform(self):104 self.assertRaises(SystemExit, Watchdog, {'ttl': 30, 'loop_wait': 10,105 'watchdog': {'mode': 'required', 'driver': 'bad'}})106 def test_exceptions(self):107 wd = Watchdog({'ttl': 30, 'loop_wait': 10, 'watchdog': {'mode': 'bad'}})108 wd.impl.close = wd.impl.keepalive = Mock(side_effect=WatchdogError(''))109 self.assertTrue(wd.activate())110 self.assertIsNone(wd.keepalive())111 self.assertIsNone(wd.disable())112 @patch('platform.system', Mock(return_value='Linux'))113 def test_config_reload(self):114 watchdog = Watchdog({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})115 self.assertTrue(watchdog.activate())116 self.assertTrue(watchdog.is_running)117 watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'off'}})118 self.assertFalse(watchdog.is_running)119 watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})120 self.assertFalse(watchdog.is_running)121 watchdog.keepalive()122 self.assertTrue(watchdog.is_running)123 watchdog.disable()124 watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required', 'driver': 'unknown'}})125 self.assertFalse(watchdog.is_healthy)126 self.assertFalse(watchdog.activate())127 watchdog.reload_config({'ttl': 30, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})128 self.assertFalse(watchdog.is_running)129 watchdog.keepalive()130 self.assertTrue(watchdog.is_running)131 watchdog.reload_config({'ttl': 60, 'loop_wait': 15, 'watchdog': {'mode': 'required'}})132 watchdog.keepalive()133class TestNullWatchdog(unittest.TestCase):134 def test_basics(self):135 watchdog = NullWatchdog()136 self.assertTrue(watchdog.can_be_disabled)137 self.assertRaises(WatchdogError, watchdog.set_timeout, 1)138 self.assertEqual(watchdog.describe(), 'NullWatchdog')139 self.assertIsInstance(NullWatchdog.from_config({}), NullWatchdog)140@unittest.skipIf(os.name == 'nt', "Windows not supported")141class TestLinuxWatchdogDevice(unittest.TestCase):142 def setUp(self):143 self.impl = LinuxWatchdogDevice.from_config({})144 @patch('os.open', Mock(return_value=3))145 @patch('os.write', Mock(side_effect=OSError))146 @patch('fcntl.ioctl', Mock(return_value=0))147 def test_basics(self):148 self.impl.open()149 try:150 if self.impl.get_support().has_foo:151 self.assertFail()152 except Exception as e:153 self.assertTrue(isinstance(e, AttributeError))154 self.assertRaises(WatchdogError, self.impl.close)155 self.assertRaises(WatchdogError, self.impl.keepalive)156 self.assertRaises(WatchdogError, self.impl.set_timeout, -1)157 @patch('os.open', Mock(return_value=3))158 @patch('fcntl.ioctl', Mock(side_effect=OSError))159 def test__ioctl(self):160 self.assertRaises(WatchdogError, self.impl.get_support)161 self.impl.open()162 self.assertRaises(WatchdogError, self.impl.get_support)163 def test_is_healthy(self):164 self.assertFalse(self.impl.is_healthy)165 @patch('os.open', Mock(return_value=3))166 @patch('fcntl.ioctl', Mock(side_effect=OSError))167 def test_error_handling(self):168 self.impl.open()169 self.assertRaises(WatchdogError, self.impl.get_timeout)170 self.assertRaises(WatchdogError, self.impl.set_timeout, 10)171 # We still try to output a reasonable string even if getting info errors172 self.assertEqual(self.impl.describe(), "Linux watchdog device")173 @patch('os.open', Mock(side_effect=OSError))174 def test_open(self):...

Full Screen

Full Screen

watchdog.py

Source:watchdog.py Github

copy

Full Screen

...197 """198 Checks if dev is Mellanox type 2 watchdog199 """200 return os.path.exists("{}/{}/timeleft".format(WD_SYSFS_PATH, dev))201def get_watchdog():202 """203 Return WatchdogType1 or WatchdogType2 based on system204 """205 watchdog_main_device_name = None206 for device in os.listdir("/dev/"):207 if device.startswith("watchdog") and is_mlnx_wd_main(device):208 watchdog_main_device_name = device209 if watchdog_main_device_name is None:210 return None211 watchdog_device_path = "/dev/{}".format(watchdog_main_device_name)212 watchdog = None213 if is_wd_type2(watchdog_main_device_name):214 watchdog = WatchdogType2(watchdog_device_path)215 else:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful