How to use log_warning method in autotest

Best Python code snippet using autotest_python

test_thermalctld.py

Source:test_thermalctld.py Github

copy

Full Screen

1import os2import sys3import multiprocessing4from imp import load_source # TODO: Replace with importlib once we no longer need to support Python 25# TODO: Clean this up once we no longer need to support Python 26if sys.version_info.major == 3:7 from unittest import mock8else:9 import mock10import pytest11tests_path = os.path.dirname(os.path.abspath(__file__))12# Add mocked_libs path so that the file under test can load mocked modules from there13mocked_libs_path = os.path.join(tests_path, 'mocked_libs')14sys.path.insert(0, mocked_libs_path)15import swsscommon16# Check we are using the mocked package17assert len(swsscommon.__path__) == 118assert(os.path.samefile(swsscommon.__path__[0], os.path.join(mocked_libs_path, 'swsscommon')))19from sonic_py_common import daemon_base20from .mock_platform import MockChassis, MockFan, MockPsu, MockSfp, MockThermal21daemon_base.db_connect = mock.MagicMock()22# Add path to the file under test so that we can load it23modules_path = os.path.dirname(tests_path)24scripts_path = os.path.join(modules_path, 'scripts')25sys.path.insert(0, modules_path)26load_source('thermalctld', os.path.join(scripts_path, 'thermalctld'))27import thermalctld28TEMPER_INFO_TABLE_NAME = 'TEMPERATURE_INFO'29@pytest.fixture(scope='function', autouse=True)30def configure_mocks():31 thermalctld.FanStatus.log_notice = mock.MagicMock()32 thermalctld.FanStatus.log_warning = mock.MagicMock()33 thermalctld.FanUpdater.log_notice = mock.MagicMock()34 thermalctld.FanUpdater.log_warning = mock.MagicMock()35 thermalctld.TemperatureStatus.log_notice = mock.MagicMock()36 thermalctld.TemperatureStatus.log_warning = mock.MagicMock()37 thermalctld.TemperatureUpdater.log_notice = mock.MagicMock()38 thermalctld.TemperatureUpdater.log_warning = mock.MagicMock()39 yield40 thermalctld.FanStatus.log_notice.reset()41 thermalctld.FanStatus.log_warning.reset()42 thermalctld.FanUpdater.log_notice.reset()43 thermalctld.FanUpdater.log_notice.reset()44 thermalctld.TemperatureStatus.log_notice.reset()45 thermalctld.TemperatureStatus.log_warning.reset()46 thermalctld.TemperatureUpdater.log_warning.reset()47 thermalctld.TemperatureUpdater.log_warning.reset()48class TestFanStatus(object):49 """50 Test cases to cover functionality in FanStatus class51 """52 def test_check_speed_value_available(self):53 fan_status = thermalctld.FanStatus()54 ret = fan_status._check_speed_value_available(30, 32, 5, True)55 assert ret == True56 assert fan_status.log_warning.call_count == 057 ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 105, True)58 assert ret == False59 assert fan_status.log_warning.call_count == 160 fan_status.log_warning.assert_called_with('Invalid tolerance value: 105')61 # Reset62 fan_status.log_warning.reset_mock()63 ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 5, False)64 assert ret == False65 assert fan_status.log_warning.call_count == 066 ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 5, True)67 assert ret == False68 assert fan_status.log_warning.call_count == 169 fan_status.log_warning.assert_called_with('Fan speed or target_speed or tolerance became unavailable, speed=N/A, target_speed=32, tolerance=5')70 def test_set_presence(self):71 fan_status = thermalctld.FanStatus()72 ret = fan_status.set_presence(True)73 assert fan_status.presence74 assert not ret75 ret = fan_status.set_presence(False)76 assert not fan_status.presence77 assert ret78 def test_set_under_speed(self):79 fan_status = thermalctld.FanStatus()80 ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)81 assert not ret82 ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, 0)83 assert not ret84 ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, 0, 0)85 assert not ret86 ret = fan_status.set_under_speed(0, 0, 0)87 assert not ret88 ret = fan_status.set_under_speed(80, 100, 19)89 assert ret90 assert fan_status.under_speed91 assert not fan_status.is_ok()92 ret = fan_status.set_under_speed(81, 100, 19)93 assert ret94 assert not fan_status.under_speed95 assert fan_status.is_ok()96 def test_set_over_speed(self):97 fan_status = thermalctld.FanStatus()98 ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)99 assert not ret100 ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, 0)101 assert not ret102 ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, 0, 0)103 assert not ret104 ret = fan_status.set_over_speed(0, 0, 0)105 assert not ret106 ret = fan_status.set_over_speed(120, 100, 19)107 assert ret108 assert fan_status.over_speed109 assert not fan_status.is_ok()110 ret = fan_status.set_over_speed(120, 100, 21)111 assert ret112 assert not fan_status.over_speed113 assert fan_status.is_ok()114class TestFanUpdater(object):115 """116 Test cases to cover functionality in FanUpdater class117 """118 @mock.patch('thermalctld.try_get', mock.MagicMock(return_value=thermalctld.NOT_AVAILABLE))119 @mock.patch('thermalctld.update_entity_info', mock.MagicMock())120 def test_refresh_fan_drawer_status_fan_drawer_get_name_not_impl(self):121 # Test case where fan_drawer.get_name is not implemented122 fan_updater = thermalctld.FanUpdater(MockChassis(), multiprocessing.Event())123 mock_fan_drawer = mock.MagicMock()124 fan_updater._refresh_fan_drawer_status(mock_fan_drawer, 1)125 assert thermalctld.update_entity_info.call_count == 0126 # TODO: Add a test case for _refresh_fan_drawer_status with a good fan drawer127 def test_update_fan_with_exception(self):128 chassis = MockChassis()129 chassis.make_error_fan()130 fan = MockFan()131 fan.make_over_speed()132 chassis.get_all_fans().append(fan)133 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())134 fan_updater.update()135 assert fan.get_status_led() == MockFan.STATUS_LED_COLOR_RED136 assert fan_updater.log_warning.call_count == 1137 # TODO: Clean this up once we no longer need to support Python 2138 if sys.version_info.major == 3:139 fan_updater.log_warning.assert_called_with("Failed to update fan status - Exception('Failed to get speed')")140 else:141 fan_updater.log_warning.assert_called_with("Failed to update fan status - Exception('Failed to get speed',)")142 def test_set_fan_led_exception(self):143 fan_status = thermalctld.FanStatus()144 mock_fan_drawer = mock.MagicMock()145 mock_fan = MockFan()146 mock_fan.set_status_led = mock.MagicMock(side_effect=NotImplementedError)147 fan_updater = thermalctld.FanUpdater(MockChassis(), multiprocessing.Event())148 fan_updater._set_fan_led(mock_fan_drawer, mock_fan, 'Test Fan', fan_status)149 assert fan_updater.log_warning.call_count == 1150 fan_updater.log_warning.assert_called_with('Failed to set status LED for fan Test Fan, set_status_led not implemented')151 def test_fan_absent(self):152 chassis = MockChassis()153 chassis.make_absent_fan()154 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())155 fan_updater.update()156 fan_list = chassis.get_all_fans()157 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED158 assert fan_updater.log_warning.call_count == 2159 expected_calls = [160 mock.call('Fan removed warning: FanDrawer 0 fan 1 was removed from the system, potential overheat hazard'),161 mock.call('Insufficient number of working fans warning: 1 fan is not working')162 ]163 assert fan_updater.log_warning.mock_calls == expected_calls164 fan_list[0].set_presence(True)165 fan_updater.update()166 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN167 assert fan_updater.log_notice.call_count == 2168 expected_calls = [169 mock.call('Fan removed warning cleared: FanDrawer 0 fan 1 was inserted'),170 mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')171 ]172 assert fan_updater.log_notice.mock_calls == expected_calls173 def test_fan_faulty(self):174 chassis = MockChassis()175 chassis.make_faulty_fan()176 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())177 fan_updater.update()178 fan_list = chassis.get_all_fans()179 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED180 assert fan_updater.log_warning.call_count == 2181 expected_calls = [182 mock.call('Fan fault warning: FanDrawer 0 fan 1 is broken'),183 mock.call('Insufficient number of working fans warning: 1 fan is not working')184 ]185 assert fan_updater.log_warning.mock_calls == expected_calls186 fan_list[0].set_status(True)187 fan_updater.update()188 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN189 assert fan_updater.log_notice.call_count == 2190 expected_calls = [191 mock.call('Fan fault warning cleared: FanDrawer 0 fan 1 is back to normal'),192 mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')193 ]194 assert fan_updater.log_notice.mock_calls == expected_calls195 def test_fan_under_speed(self):196 chassis = MockChassis()197 chassis.make_under_speed_fan()198 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())199 fan_updater.update()200 fan_list = chassis.get_all_fans()201 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED202 assert fan_updater.log_warning.call_count == 1203 fan_updater.log_warning.assert_called_with('Fan low speed warning: FanDrawer 0 fan 1 current speed=1, target speed=2, tolerance=0')204 fan_list[0].make_normal_speed()205 fan_updater.update()206 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN207 assert fan_updater.log_notice.call_count == 1208 fan_updater.log_notice.assert_called_with('Fan low speed warning cleared: FanDrawer 0 fan 1 speed is back to normal')209 def test_fan_over_speed(self):210 chassis = MockChassis()211 chassis.make_over_speed_fan()212 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())213 fan_updater.update()214 fan_list = chassis.get_all_fans()215 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED216 assert fan_updater.log_warning.call_count == 1217 fan_updater.log_warning.assert_called_with('Fan high speed warning: FanDrawer 0 fan 1 target speed=1, current speed=2, tolerance=0')218 fan_list[0].make_normal_speed()219 fan_updater.update()220 assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN221 assert fan_updater.log_notice.call_count == 1222 fan_updater.log_notice.assert_called_with('Fan high speed warning cleared: FanDrawer 0 fan 1 speed is back to normal')223 def test_update_psu_fans(self):224 chassis = MockChassis()225 psu = MockPsu()226 mock_fan = MockFan()227 psu._fan_list.append(mock_fan)228 chassis._psu_list.append(psu)229 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())230 fan_updater.update()231 assert fan_updater.log_warning.call_count == 0232 fan_updater._refresh_fan_status = mock.MagicMock(side_effect=Exception("Test message"))233 fan_updater.update()234 assert fan_updater.log_warning.call_count == 1235 # TODO: Clean this up once we no longer need to support Python 2236 if sys.version_info.major == 3:237 fan_updater.log_warning.assert_called_with("Failed to update PSU fan status - Exception('Test message')")238 else:239 fan_updater.log_warning.assert_called_with("Failed to update PSU fan status - Exception('Test message',)")240class TestThermalMonitor(object):241 """242 Test cases to cover functionality in ThermalMonitor class243 """244 def test_main(self):245 mock_chassis = MockChassis()246 thermal_monitor = thermalctld.ThermalMonitor(mock_chassis)247 thermal_monitor.fan_updater.update = mock.MagicMock()248 thermal_monitor.temperature_updater.update = mock.MagicMock()249 thermal_monitor.main()250 assert thermal_monitor.fan_updater.update.call_count == 1251 assert thermal_monitor.temperature_updater.update.call_count == 1252def test_insufficient_fan_number():253 fan_status1 = thermalctld.FanStatus()254 fan_status2 = thermalctld.FanStatus()255 fan_status1.set_presence(False)256 fan_status2.set_fault_status(False)257 assert thermalctld.FanStatus.get_bad_fan_count() == 2258 assert fan_status1.get_bad_fan_count() == 2259 assert fan_status2.get_bad_fan_count() == 2260 thermalctld.FanStatus.reset_fan_counter()261 assert thermalctld.FanStatus.get_bad_fan_count() == 0262 assert fan_status1.get_bad_fan_count() == 0263 assert fan_status2.get_bad_fan_count() == 0264 chassis = MockChassis()265 chassis.make_absent_fan()266 chassis.make_faulty_fan()267 fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())268 fan_updater.update()269 assert fan_updater.log_warning.call_count == 3270 expected_calls = [271 mock.call('Fan removed warning: FanDrawer 0 fan 1 was removed from the system, potential overheat hazard'),272 mock.call('Fan fault warning: FanDrawer 1 fan 1 is broken'),273 mock.call('Insufficient number of working fans warning: 2 fans are not working')274 ]275 assert fan_updater.log_warning.mock_calls == expected_calls276 fan_list = chassis.get_all_fans()277 fan_list[0].set_presence(True)278 fan_updater.update()279 assert fan_updater.log_notice.call_count == 1280 fan_updater.log_warning.assert_called_with('Insufficient number of working fans warning: 1 fan is not working')281 fan_list[1].set_status(True)282 fan_updater.update()283 assert fan_updater.log_notice.call_count == 3284 expected_calls = [285 mock.call('Fan removed warning cleared: FanDrawer 0 fan 1 was inserted'),286 mock.call('Fan fault warning cleared: FanDrawer 1 fan 1 is back to normal'),287 mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')288 ]289 assert fan_updater.log_notice.mock_calls == expected_calls290def test_temperature_status_set_over_temper():291 temperature_status = thermalctld.TemperatureStatus()292 ret = temperature_status.set_over_temperature(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)293 assert not ret294 ret = temperature_status.set_over_temperature(thermalctld.NOT_AVAILABLE, 0)295 assert not ret296 ret = temperature_status.set_over_temperature(0, thermalctld.NOT_AVAILABLE)297 assert not ret298 ret = temperature_status.set_over_temperature(2, 1)299 assert ret300 assert temperature_status.over_temperature301 ret = temperature_status.set_over_temperature(1, 2)302 assert ret303 assert not temperature_status.over_temperature304def test_temperstatus_set_under_temper():305 temperature_status = thermalctld.TemperatureStatus()306 ret = temperature_status.set_under_temperature(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)307 assert not ret308 ret = temperature_status.set_under_temperature(thermalctld.NOT_AVAILABLE, 0)309 assert not ret310 ret = temperature_status.set_under_temperature(0, thermalctld.NOT_AVAILABLE)311 assert not ret312 ret = temperature_status.set_under_temperature(1, 2)313 assert ret314 assert temperature_status.under_temperature315 ret = temperature_status.set_under_temperature(2, 1)316 assert ret317 assert not temperature_status.under_temperature318def test_temperature_status_set_not_available():319 THERMAL_NAME = 'Chassis 1 Thermal 1'320 temperature_status = thermalctld.TemperatureStatus()321 temperature_status.temperature = 20.0322 temperature_status.set_temperature(THERMAL_NAME, thermalctld.NOT_AVAILABLE)323 assert temperature_status.temperature is None324 assert temperature_status.log_warning.call_count == 1325 temperature_status.log_warning.assert_called_with('Temperature of {} became unavailable'.format(THERMAL_NAME))326class TestTemperatureUpdater(object):327 """328 Test cases to cover functionality in TemperatureUpdater class329 """330 def test_deinit(self):331 chassis = MockChassis()332 temp_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())333 temp_updater.temperature_status_dict = {'key1': 'value1', 'key2': 'value2'}334 temp_updater.table._del = mock.MagicMock()335 temp_updater.deinit()336 assert temp_updater.table._del.call_count == 2337 expected_calls = [mock.call('key1'), mock.call('key2')]338 temp_updater.table._del.assert_has_calls(expected_calls, any_order=True)339 340 def test_over_temper(self):341 chassis = MockChassis()342 chassis.make_over_temper_thermal()343 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())344 temperature_updater.update()345 thermal_list = chassis.get_all_thermals()346 assert temperature_updater.log_warning.call_count == 1347 temperature_updater.log_warning.assert_called_with('High temperature warning: chassis 1 Thermal 1 current temperature 3C, high threshold 2C')348 thermal_list[0].make_normal_temper()349 temperature_updater.update()350 assert temperature_updater.log_notice.call_count == 1351 temperature_updater.log_notice.assert_called_with('High temperature warning cleared: chassis 1 Thermal 1 temperature restored to 2C, high threshold 3C')352 def test_under_temper(self):353 chassis = MockChassis()354 chassis.make_under_temper_thermal()355 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())356 temperature_updater.update()357 thermal_list = chassis.get_all_thermals()358 assert temperature_updater.log_warning.call_count == 1359 temperature_updater.log_warning.assert_called_with('Low temperature warning: chassis 1 Thermal 1 current temperature 1C, low threshold 2C')360 thermal_list[0].make_normal_temper()361 temperature_updater.update()362 assert temperature_updater.log_notice.call_count == 1363 temperature_updater.log_notice.assert_called_with('Low temperature warning cleared: chassis 1 Thermal 1 temperature restored to 2C, low threshold 1C')364 def test_update_psu_thermals(self):365 chassis = MockChassis()366 psu = MockPsu()367 mock_thermal = MockThermal()368 psu._thermal_list.append(mock_thermal)369 chassis._psu_list.append(psu)370 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())371 temperature_updater.update()372 assert temperature_updater.log_warning.call_count == 0373 mock_thermal.get_temperature = mock.MagicMock(side_effect=Exception("Test message"))374 temperature_updater.update()375 assert temperature_updater.log_warning.call_count == 1376 # TODO: Clean this up once we no longer need to support Python 2377 if sys.version_info.major == 3:378 temperature_updater.log_warning.assert_called_with("Failed to update thermal status for PSU 1 Thermal 1 - Exception('Test message')")379 else:380 temperature_updater.log_warning.assert_called_with("Failed to update thermal status for PSU 1 Thermal 1 - Exception('Test message',)")381 def test_update_sfp_thermals(self):382 chassis = MockChassis()383 sfp = MockSfp()384 mock_thermal = MockThermal()385 sfp._thermal_list.append(mock_thermal)386 chassis._sfp_list.append(sfp)387 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())388 temperature_updater.update()389 assert temperature_updater.log_warning.call_count == 0390 mock_thermal.get_temperature = mock.MagicMock(side_effect=Exception("Test message"))391 temperature_updater.update()392 assert temperature_updater.log_warning.call_count == 1393 # TODO: Clean this up once we no longer need to support Python 2394 if sys.version_info.major == 3:395 temperature_updater.log_warning.assert_called_with("Failed to update thermal status for SFP 1 Thermal 1 - Exception('Test message')")396 else:397 temperature_updater.log_warning.assert_called_with("Failed to update thermal status for SFP 1 Thermal 1 - Exception('Test message',)")398 def test_update_thermal_with_exception(self):399 chassis = MockChassis()400 chassis.make_error_thermal()401 thermal = MockThermal()402 thermal.make_over_temper()403 chassis.get_all_thermals().append(thermal)404 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())405 temperature_updater.update()406 assert temperature_updater.log_warning.call_count == 2407 # TODO: Clean this up once we no longer need to support Python 2408 if sys.version_info.major == 3:409 expected_calls = [410 mock.call("Failed to update thermal status for chassis 1 Thermal 1 - Exception('Failed to get temperature')"),411 mock.call('High temperature warning: chassis 1 Thermal 2 current temperature 3C, high threshold 2C')412 ]413 else:414 expected_calls = [415 mock.call("Failed to update thermal status for chassis 1 Thermal 1 - Exception('Failed to get temperature',)"),416 mock.call('High temperature warning: chassis 1 Thermal 2 current temperature 3C, high threshold 2C')417 ]418 assert temperature_updater.log_warning.mock_calls == expected_calls419 def test_update_module_thermals(self):420 chassis = MockChassis()421 chassis.make_module_thermal()422 chassis.set_modular_chassis(True)423 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())424 temperature_updater.update()425 assert len(temperature_updater.module_thermals) == 3426 427 chassis._module_list = []428 temperature_updater.update()429 assert len(temperature_updater.module_thermals) == 0430# Modular chassis-related tests431def test_updater_thermal_check_modular_chassis():432 chassis = MockChassis()433 assert chassis.is_modular_chassis() == False434 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())435 assert temperature_updater.chassis_table == None436 chassis.set_modular_chassis(True)437 chassis.set_my_slot(-1)438 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())439 assert temperature_updater.chassis_table == None440 my_slot = 1441 chassis.set_my_slot(my_slot)442 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())443 assert temperature_updater.chassis_table != None444 assert temperature_updater.chassis_table.table_name == '{}_{}'.format(TEMPER_INFO_TABLE_NAME, str(my_slot))445def test_updater_thermal_check_chassis_table():446 chassis = MockChassis()447 thermal1 = MockThermal()448 chassis.get_all_thermals().append(thermal1)449 chassis.set_modular_chassis(True)450 chassis.set_my_slot(1)451 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())452 temperature_updater.update()453 assert temperature_updater.chassis_table.get_size() == chassis.get_num_thermals()454 thermal2 = MockThermal()455 chassis.get_all_thermals().append(thermal2)456 temperature_updater.update()457 assert temperature_updater.chassis_table.get_size() == chassis.get_num_thermals()458def test_updater_thermal_check_min_max():459 chassis = MockChassis()460 thermal = MockThermal(1)461 chassis.get_all_thermals().append(thermal)462 chassis.set_modular_chassis(True)463 chassis.set_my_slot(1)464 temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())465 temperature_updater.update()466 slot_dict = temperature_updater.chassis_table.get(thermal.get_name())467 assert slot_dict['minimum_temperature'] == str(thermal.get_minimum_recorded())468 assert slot_dict['maximum_temperature'] == str(thermal.get_maximum_recorded())469def test_signal_handler():470 # Test SIGHUP471 daemon_thermalctld = thermalctld.ThermalControlDaemon()472 daemon_thermalctld.stop_event.set = mock.MagicMock()473 daemon_thermalctld.log_info = mock.MagicMock()474 daemon_thermalctld.log_warning = mock.MagicMock()475 daemon_thermalctld.thermal_manager.stop = mock.MagicMock()476 daemon_thermalctld.signal_handler(thermalctld.signal.SIGHUP, None)477 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert478 assert daemon_thermalctld.log_info.call_count == 1479 daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGHUP' - ignoring...")480 assert daemon_thermalctld.log_warning.call_count == 0481 assert daemon_thermalctld.stop_event.set.call_count == 0482 assert daemon_thermalctld.thermal_manager.stop.call_count == 0483 assert thermalctld.exit_code == thermalctld.ERR_UNKNOWN484 # Test SIGINT485 daemon_thermalctld = thermalctld.ThermalControlDaemon()486 daemon_thermalctld.stop_event.set = mock.MagicMock()487 daemon_thermalctld.log_info = mock.MagicMock()488 daemon_thermalctld.log_warning = mock.MagicMock()489 daemon_thermalctld.thermal_manager.stop = mock.MagicMock()490 test_signal = thermalctld.signal.SIGINT491 daemon_thermalctld.signal_handler(test_signal, None)492 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert493 assert daemon_thermalctld.log_info.call_count == 1494 daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGINT' - exiting...")495 assert daemon_thermalctld.log_warning.call_count == 0496 assert daemon_thermalctld.stop_event.set.call_count == 1497 assert daemon_thermalctld.thermal_manager.stop.call_count == 1498 assert thermalctld.exit_code == (128 + test_signal)499 # Test SIGTERM500 thermalctld.exit_code = thermalctld.ERR_UNKNOWN501 daemon_thermalctld = thermalctld.ThermalControlDaemon()502 daemon_thermalctld.stop_event.set = mock.MagicMock()503 daemon_thermalctld.log_info = mock.MagicMock()504 daemon_thermalctld.log_warning = mock.MagicMock()505 daemon_thermalctld.thermal_manager.stop = mock.MagicMock()506 test_signal = thermalctld.signal.SIGTERM507 daemon_thermalctld.signal_handler(test_signal, None)508 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert509 assert daemon_thermalctld.log_info.call_count == 1510 daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGTERM' - exiting...")511 assert daemon_thermalctld.log_warning.call_count == 0512 assert daemon_thermalctld.stop_event.set.call_count == 1513 assert daemon_thermalctld.thermal_manager.stop.call_count == 1514 assert thermalctld.exit_code == (128 + test_signal)515 # Test an unhandled signal516 thermalctld.exit_code = thermalctld.ERR_UNKNOWN517 daemon_thermalctld = thermalctld.ThermalControlDaemon()518 daemon_thermalctld.stop_event.set = mock.MagicMock()519 daemon_thermalctld.log_info = mock.MagicMock()520 daemon_thermalctld.log_warning = mock.MagicMock()521 daemon_thermalctld.thermal_manager.stop = mock.MagicMock()522 daemon_thermalctld.signal_handler(thermalctld.signal.SIGUSR1, None)523 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert524 assert daemon_thermalctld.log_warning.call_count == 1525 daemon_thermalctld.log_warning.assert_called_with("Caught unhandled signal 'SIGUSR1' - ignoring...")526 assert daemon_thermalctld.log_info.call_count == 0527 assert daemon_thermalctld.stop_event.set.call_count == 0528 assert daemon_thermalctld.thermal_manager.stop.call_count == 0529 assert thermalctld.exit_code == thermalctld.ERR_UNKNOWN530def test_daemon_run():531 daemon_thermalctld = thermalctld.ThermalControlDaemon()532 daemon_thermalctld.stop_event.wait = mock.MagicMock(return_value=True)533 daemon_thermalctld.thermal_manager.get_interval = mock.MagicMock(return_value=60)534 ret = daemon_thermalctld.run()535 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert536 assert ret is False537 daemon_thermalctld = thermalctld.ThermalControlDaemon()538 daemon_thermalctld.stop_event.wait = mock.MagicMock(return_value=False)539 daemon_thermalctld.thermal_manager.get_interval = mock.MagicMock(return_value=60)540 ret = daemon_thermalctld.run()541 daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert542 assert ret is True543def test_try_get():544 def good_callback():545 return 'good result'546 def unimplemented_callback():547 raise NotImplementedError548 ret = thermalctld.try_get(good_callback)549 assert ret == 'good result'550 ret = thermalctld.try_get(unimplemented_callback)551 assert ret == thermalctld.NOT_AVAILABLE552 ret = thermalctld.try_get(unimplemented_callback, 'my default')553 assert ret == 'my default'554def test_update_entity_info():555 mock_table = mock.MagicMock()556 mock_fan = MockFan()557 expected_fvp = thermalctld.swsscommon.FieldValuePairs(558 [('position_in_parent', '1'),559 ('parent_name', 'Parent Name')560 ])561 thermalctld.update_entity_info(mock_table, 'Parent Name', 'Key Name', mock_fan, 1)562 assert mock_table.set.call_count == 1563 mock_table.set.assert_called_with('Key Name', expected_fvp)564@mock.patch('thermalctld.ThermalControlDaemon.run')565def test_main(mock_run):566 mock_run.return_value = False567 ret = thermalctld.main()568 assert mock_run.call_count == 1...

Full Screen

Full Screen

mysql_conn.py

Source:mysql_conn.py Github

copy

Full Screen

1import re2import json3import time4import MySQLdb5import type_check6from DBUtils.PooledDB import PooledDB7from loggingex import LOG_WARNING8from loggingex import LOG_INFO9from loggingex import LOG_ERROR_SQL10class mysql_conn():11 def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):12 self._host = host_name13 self._port = int(port_num)14 self._user = user_name15 self._passwd = password16 self._db = db_name17 self._charset = charset_name18 self._pool = None19 self._table_info = {}20 self.re_connect()21 def __del__(self):22 self._try_close_connect()23 def re_connect(self):24 self._try_close_connect()25 26 try:27 self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)28 LOG_INFO("connect %s success" %(self._db))29 self.refresh_tables_info()30 return31 except MySQLdb.Error, e :32 if e.args[0] == 1049:33 self._create_db()34 else:35 LOG_WARNING("%s connect error %s" % (self._db, str(e)))36 return37 except Exception as e:38 LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))39 return 40 try:41 self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)42 pool = PersistentDB(creator=MySQLdb, mincached=1, maxcached=3, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)43 LOG_INFO("connect %s success" %(self._db))44 self.refresh_tables_info()45 return46 except Exception as e:47 LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))48 return49 50 if None == self._pool:51 LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))52 return53 54 def _try_close_connect(self):55 if None == self._pool:56 return57 try:58 self._pool.close()59 except MySQLdb.Error, e :60 LOG_WARNING("%s %s" % (self._db, str(e)))61 except Exception as e:62 LOG_WARNING("%s %s" % (self._db, str(e)))63 finally:64 self._pool = None65 def _create_db(self):66 conn = None67 cursor = None68 try:69 conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)70 cursor = conn.cursor()71 sql = """create database if not exists %s""" %(self._db)72 #LOG_INFO(sql)73 cursor.execute(sql)74 conn.select_db(self._db);75 conn.commit()76 except MySQLdb.Error, e :77 LOG_WARNING("%s execute error %s" % (sql, str(e)))78 finally:79 try:80 if cursor:81 cursor.close()82 if conn:83 conn.close()84 except:85 pass86 def select(self, table_name, fields_array, conditions, pre = "", extend = ""):87 fields_str = "," . join(fields_array)88 conds = []89 for (column_name, column_data_info) in conditions.items():90 column_type = self._get_column_type(table_name, column_name)91 column_data = column_data_info[0]92 operation = column_data_info[1]93 if isinstance(column_data, list):94 new_datas = []95 for item in column_data:96 new_data = self._conv_data(item, column_type)97 try:98 new_datas.append(new_data)99 except:100 LOG_WARNING("%s %s conv error" %(item, column_type))101 temp_str = "," . join(new_datas)102 cond = column_name + " " + operation + " (" + temp_str + ")"103 conds.append(cond)104 else:105 new_data = self._conv_data(column_data, column_type)106 try:107 cond = column_name + " " + operation + " " + new_data108 conds.append(cond)109 except:110 LOG_WARNING("%s %s conv error" %(column_data, column_type))111 conds_str = " and " . join(conds)112 sql = "select " + pre + " " + fields_str + " from " + table_name113 if len(conds_str) > 0:114 sql = sql + " where " + conds_str115 116 if len(extend) > 0:117 sql = sql + " " + extend118 119 data_info = self.execute(sql, select = True)120 return data_info121 def _get_tables_info(self):122 tables_info = {}123 tables_sql = "show tables"124 tables_name = self.execute(tables_sql, select = True)125 for table_name_item in tables_name:126 table_name = table_name_item[0]127 if 0 == len(table_name):128 continue129 columns_sql = "show columns from " + table_name 130 table_info = self.execute(columns_sql, select = True)131 table_name = table_name_item[0]132 columns_info = self._get_table_info(table_info)133 if len(columns_info):134 tables_info[table_name] = columns_info135 return tables_info136 def _get_table_info(self, table_info):137 columns_info = {}138 for item in table_info:139 column_name = item[0]140 column_type_info = item[1]141 (type, len) = self._get_column_type_info(column_type_info)142 columns_info[column_name] = {"type":type, "length":len}143 return columns_info144 def _get_column_type_info(self, type_info):145 re_str = '(\w*)\((\d*),?.*\)'146 kw = re.findall(re_str, type_info)147 if len(kw):148 if len(kw[0]) > 1:149 return (kw[0][0], kw[0][1])150 return (None, None)151 def _implode(self, array_data, escape = False):152 array_str = ""153 for item in array_data:154 if 0 != len(array_str):155 array_str += ","156 if escape:157 if type_check.IsNumber(item) or type_check.IsFloat(item):158 array_str += str(item)159 elif type_check.IsString(item):160 array_str += "'"161 array_str += item.encode("string_escape")162 array_str += "'"163 else:164 array_str += str(item)165 array_str = "(" + array_str + ")"166 return array_str167 def _get_column_type(self, table_name, column_name):168 if 0 == len(self._table_info):169 self.refresh_tables_info()170 if table_name not in self._table_info.keys():171 LOG_WARNING("table_%s info in not exist" %(table_name))172 return "None"173 if column_name not in self._table_info[table_name].keys():174 LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))175 return "None"176 return self._table_info[table_name][column_name]["type"]177 178 def _conv_data(self, data, type):179 if type == "varchar" or type == "char":180 return '"%s"' % (data)181 elif type == "float" or type == "double":182 try:183 conv_data = float(data)184 return "%.8f" % (conv_data)185 except Exception as e:186 LOG_WARNING("conv %s to %s error" % (data, type))187 return "0"188 elif type == "tinyint" or type == "bigint" or type == "int":189 return "%d" % (int(data))190 191 def _implode_by_tableinfo(self, data_array, table_name, columns_name_array):192 array_str = ""193 if len(data_array) != len(columns_name_array):194 LOG_WARNING("columns name length != data array length! %s %s " % (json.dumps(data_array), json.dumps(columns_name_array)))195 return ""196 for index in range(0, len(columns_name_array)):197 item = data_array[index]198 column_name = columns_name_array[index]199 column_type = self._get_column_type(table_name, column_name)200 if 0 != len(array_str):201 array_str += ","202 203 new_data = self._conv_data(item, column_type)204 try:205 array_str += new_data206 except:207 LOG_WARNING("%s %s conv error" %(item, column_type))208 array_str = "(" + array_str + ")"209 return array_str210 211 def refresh_tables_info(self):212 self._table_info = self._get_tables_info()213 def insert_data(self, table_name, columns_name, data_array):214 if 0 == len(data_array):215 return216 columns = self._implode(columns_name)217 value_list = []218 for item in data_array:219 value_str = self._implode_by_tableinfo(item, table_name, columns_name)220 value_list.append(value_str)221 values_sql = ",".join(value_list)222 if 0 == len(values_sql):223 return224 sql = "insert into " + table_name + columns + " values" + values_sql225 #LOG_INFO(sql)226 self.execute(sql, commit = True)227 228 def update(self, table_name, data, keys_name):229 key_data = {}230 update_data = {}231 for (column_name, column_data) in data.items():232 if column_name in keys_name:233 key_data[column_name] = column_data234 else:235 update_data[column_name] = column_data236 237 update_str_list = []238 try:239 for (column_name, column_data) in update_data.items():240 column_type = self._get_column_type(table_name, column_name)241 new_data = self._conv_data(column_data, column_type)242 update_str = column_name + "=" + new_data243 update_str_list.append(update_str)244 update_info_str = "," . join(update_str_list)245 cond_str_list = []246 for (column_name, column_data) in key_data.items():247 column_type = self._get_column_type(table_name, column_name)248 new_data = self._conv_data(column_data, column_type)249 cond_str = column_name + "=" + new_data250 cond_str_list.append(cond_str)251 cond_info_str = " AND " . join(cond_str_list)252 sql = "UPDATE %s SET %s" % (table_name, update_info_str)253 if 0 != len(cond_info_str):254 sql = sql + " where " + cond_info_str255 256 self.execute(sql, commit = True)257 except Exception as e:258 LOG_WARNING("update %s %s %s error: %s" % (table_name, data, keys_name, str(e)))259 def insert_onduplicate(self, table_name, data, keys_name):260 columns_name = data.keys()261 columns = self._implode(columns_name)262 value_list = []263 data_array = data.values()264 value_str = self._implode_by_tableinfo(data_array, table_name, columns_name)265 sql = "insert into " + table_name + columns + " values" + value_str266 267 update_data = {}268 for (column_name, column_data) in data.items():269 if column_name in keys_name:270 continue271 update_data[column_name] = column_data272 update_str_list = []273 for (column_name, column_data) in update_data.items():274 column_type = self._get_column_type(table_name, column_name)275 new_data = self._conv_data(column_data, column_type)276 update_str = column_name + "=" + new_data277 update_str_list.append(update_str)278 update_info_str = "," . join(update_str_list)279 if len(update_info_str) != 0:280 sql = sql + " ON DUPLICATE KEY UPDATE " + update_info_str281 #LOG_INFO(sql)282 self.execute(sql, commit = True)283 284 def has_table(self, table_name):285 if 0 == len(self._table_info):286 self.refresh_tables_info()287 if table_name in self._table_info.keys():288 return True289 return False290 def execute(self, sql, select = False, commit=False):291 try:292 data = ()293 conn = self._pool.connection()294 cursor = conn.cursor()295 data = cursor.execute(sql)296 if select:297 data = cursor.fetchall()298 if commit:299 conn.commit()300 cursor.close()301 except Exception as e:302 LOG_WARNING("excute sql error %s" % (str(e)))303 LOG_ERROR_SQL("%s" % (sql))304 finally:305 cursor.close()306 conn.close()307 return data308if __name__ == "__main__":309 a = mysql_conn("127.0.0.1", 3306, "root", "fangliang", "stock")310 #a.insert_data("share_base_info", ["share_id", "share_name"], [["000123","2'xxx"], ["000345","'4yyy"]])311 #a.insert_onduplicate("share_base_info", {"share_id":"000123", "share_name":"4YYYYYYYYYYY"}, ["share_id"])...

Full Screen

Full Screen

bitmask-helper

Source:bitmask-helper Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3#4# Author: Kali Kaneko5# Copyright (C) 2015-2017 LEAP Encryption Access Project6#7# This program is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program. If not, see <http://www.gnu.org/licenses/>.19"""20bitmask-helper21==============================================================================22This is a privileged helper script for safely running certain commands as root23under OSX.24It should be run by launchd, and it exposes a Unix Domain Socket to where25the following commmands can be written by the Bitmask application:26 firewall_start [restart] GATEWAY1 GATEWAY2 ...27 firewall_stop28 openvpn_start CONFIG1 CONFIG1 ...29 openvpn_stop30 fw_email_start uid31 fw_email_stop32To load it manually:33 sudo launchctl load /Library/LaunchDaemons/se.leap.bitmask-helper34To see the loaded rules:35 sudo pfctl -s rules -a bitmask36To test the commands, you can write directly to the unix socket. Remember to37terminate the command properly:38 echo 'firewall_stop/CMD' | socat - UNIX-CONNECT:/var/run/bitmask-helper.socket39"""40import os41import socket42import signal43import subprocess44import syslog45import threading46from commands import getoutput as exec_cmd47from functools import partial48import daemon49import re50VERSION = "2"51SCRIPT = "bitmask-helper"52NAMESERVER = "10.42.0.1"53BITMASK_ANCHOR = "com.apple/250.BitmaskFirewall"54BITMASK_ANCHOR_EMAIL = "bitmask_email"55OPENVPN_USER = 'nobody'56OPENVPN_GROUP = 'nogroup'57LEAPOPENVPN = 'LEAPOPENVPN'58APP_PATH = '/Applications/Bitmask.app/'59RESOURCES_PATH = APP_PATH + 'Contents/Resources/'60FIXED_FLAGS = [61 "--setenv", "LEAPOPENVPN", "1",62 "--nobind",63 "--client",64 "--tls-client",65 "--remote-cert-tls", "server",66 "--management-signal",67 "--script-security", "1",68 "--user", "nobody",69 "--remap-usr1", "SIGTERM",70 "--group", OPENVPN_GROUP,71]72ALLOWED_FLAGS = {73 "--remote": ["IP", "NUMBER", "PROTO"],74 "--tls-cipher": ["CIPHER"],75 "--cipher": ["CIPHER"],76 "--auth": ["CIPHER"],77 "--management": ["DIR", "UNIXSOCKET"],78 "--management-client-user": ["USER"],79 "--cert": ["FILE"],80 "--key": ["FILE"],81 "--ca": ["FILE"],82 "--fragment": ["NUMBER"]83}84PARAM_FORMATS = {85 "NUMBER": lambda s: re.match("^\d+$", s),86 "PROTO": lambda s: re.match("^(tcp|udp)$", s),87 "IP": lambda s: is_valid_address(s),88 "CIPHER": lambda s: re.match("^[A-Z0-9-]+$", s),89 "USER": lambda s: re.match(90 "^[a-zA-Z0-9_\.\@][a-zA-Z0-9_\-\.\@]*\$?$", s), # IEEE Std 1003.1-200191 "FILE": lambda s: os.path.isfile(s),92 "DIR": lambda s: os.path.isdir(os.path.split(s)[0]),93 "UNIXSOCKET": lambda s: s == "unix",94 "UID": lambda s: re.match("^[a-zA-Z0-9]+$", s)95}96#97# paths (must use absolute paths, since this script is run as root)98#99PFCTL = '/sbin/pfctl'100ROUTE = '/sbin/route'101AWK = '/usr/bin/awk'102GREP = '/usr/bin/grep'103CAT = '/bin/cat'104UID = os.getuid()105SERVER_ADDRESS = '/var/run/bitmask-helper.socket'106cmd_lock = threading.Lock()107#108# COMMAND DISPATCH109#110def serve_forever():111 try:112 os.unlink(SERVER_ADDRESS)113 except OSError:114 if os.path.exists(SERVER_ADDRESS):115 raise116 syslog.syslog(syslog.LOG_WARNING, "serving forever")117 # XXX should check permissions on the socket file118 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)119 sock.bind(SERVER_ADDRESS)120 sock.listen(1)121 syslog.syslog(syslog.LOG_WARNING, "Binded to %s" % SERVER_ADDRESS)122 while True:123 connection, client_address = sock.accept()124 thread = threading.Thread(target=handle_command, args=[connection])125 thread.daemon = True126 thread.start()127def recv_until_marker(sock):128 end = '/CMD'129 total_data = []130 data = ''131 while True:132 data = sock.recv(8192)133 if end in data:134 total_data.append(data[:data.find(end)])135 break136 total_data.append(data)137 if len(total_data) > 1:138 # check if end_of_data was split139 last_pair = total_data[-2] + total_data[-1]140 if end in last_pair:141 total_data[-2] = last_pair[:last_pair.find(end)]142 total_data.pop()143 break144 return ''.join(total_data).strip()145def handle_command(sock):146 syslog.syslog(syslog.LOG_WARNING, "handle")147 received = recv_until_marker(sock)148 line = received.replace('\n', '').split(' ')149 command, args = line[0], line[1:]150 syslog.syslog(syslog.LOG_WARNING, 'command %s' % (command))151 cmd_dict = {152 'firewall_start': (firewall_start, args),153 'firewall_stop': (firewall_stop, []),154 'firewall_isup': (firewall_isup, []),155 'openvpn_start': (openvpn_start, args),156 'openvpn_stop': (openvpn_stop, []),157 'openvpn_force_stop': (openvpn_stop, ['KILL']),158 'openvpn_set_watcher': (openvpn_set_watcher, args)159 }160 cmd_call = cmd_dict.get(command, None)161 cmd_lock.acquire()162 try:163 if cmd_call:164 cmd, args = cmd_call165 if args:166 cmd = partial(cmd, *args)167 result = cmd()168 syslog.syslog(syslog.LOG_WARNING, "%s result: %s" % (169 command, str(result)))170 if result == 'YES':171 sock.sendall("%s: YES\n" % command)172 elif result == 'NO':173 sock.sendall("%s: NO\n" % command)174 else:175 sock.sendall("%s: OK\n" % command)176 else:177 syslog.syslog(178 syslog.LOG_WARNING, 'invalid command: %s' % (command,))179 sock.sendall("%s: ERROR\n" % command)180 except Exception as exc:181 syslog.syslog(182 syslog.LOG_WARNING, "error executing function %r" % (exc))183 finally:184 cmd_lock.release()185 sock.close()186#187# OPENVPN188#189openvpn_proc = None190openvpn_watcher_pid = None191def openvpn_start(*args):192 """193 Sanitize input and run openvpn as a subprocess of this long-running daemon.194 Keeps a reference to the subprocess Popen class instance.195 :param args: arguments to be passed to openvpn196 :type args: list197 """198 syslog.syslog(syslog.LOG_WARNING, "OPENVPN START")199 opts = list(args[1:])200 opts += ['--dhcp-option', 'DNS', '10.42.0.1',201 '--up', RESOURCES_PATH + 'client.up.sh',202 '--down', RESOURCES_PATH + 'client.down.sh']203 opts += ["--dev", "tun"]204 binary = [RESOURCES_PATH + 'openvpn.leap']205 if not os.path.isfile(binary[0]):206 binary = ['/usr/local/sbin/openvpn']207 cmd = binary + opts208 syslog.syslog(syslog.LOG_WARNING, 'LAUNCHING VPN: ' + ' '.join(cmd))209 global openvpn_proc210 if openvpn_proc is not None:211 syslog.syslog(syslog.LOG_WARNING, 'vpn: another process is already started')212 return213 # TODO sanitize options214 openvpn_proc = subprocess.Popen(cmd, shell=False, bufsize=-1)215 # XXX DEBUG -------------------------------------------------216 # try:217 # result = subprocess.check_output(218 # cmd, shell=False, stderr=subprocess.STDOUT)219 # except Exception as exc:220 # syslog.syslog(syslog.LOG_WARNING, exc.output)221def openvpn_stop(sig='TERM'):222 """223 Stop the openvpn that has been launched by this privileged helper.224 :param args: arguments to openvpn225 :type args: list226 """227 global openvpn_proc228 syslog.syslog(229 syslog.LOG_WARNING, "Stopping OpenVPN...")230 if openvpn_proc:231 syslog.syslog(232 syslog.LOG_WARNING, "OpenVPN Process: %s" % str(openvpn_proc.pid))233 if sig == 'KILL':234 stop_signal = signal.SIGKILL235 openvpn_proc.kill()236 elif sig == 'TERM':237 stop_signal = signal.SIGTERM238 openvpn_proc.terminate()239 retcode = openvpn_proc.wait()240 syslog.syslog(241 syslog.LOG_WARNING, "OpenVPN died. Return code: %s" % str(retcode))242 syslog.syslog(243 syslog.LOG_WARNING, "openvpn_watcher_pid: %s" % str(244 openvpn_watcher_pid))245 openvpn_proc = None246 if openvpn_watcher_pid:247 try:248 os.kill(openvpn_watcher_pid, stop_signal)249 except Exception:250 pass251def openvpn_set_watcher(pid, *args):252 global openvpn_watcher_pid253 openvpn_watcher_pid = int(pid)254 syslog.syslog(syslog.LOG_WARNING, "Watcher PID: %s" % pid)255#256# FIREWALL257#258def firewall_start(*gateways):259 """260 Bring up the firewall.261 :param gws: list of gateways, to be sanitized.262 :type gws: list263 """264 gateways = get_gateways(gateways)265 if not gateways:266 return False267 _enable_pf()268 _reset_bitmask_gateways_table(gateways)269 default_device = _get_default_device()270 _load_bitmask_anchor(default_device)271def firewall_stop():272 """273 Flush everything from anchor bitmask274 """275 cmd = '{pfctl} -a {anchor} -F all'.format(276 pfctl=PFCTL, anchor=BITMASK_ANCHOR)277 return exec_cmd(cmd)278def firewall_isup():279 """280 Return YES if anchor bitmask is loaded with rules281 """282 cmd = '{pfctl} -s rules -a {anchor} | wc -l'.format(283 pfctl=PFCTL, anchor=BITMASK_ANCHOR)284 output = exec_cmd(cmd)285 rules = output[-1]286 if int(rules) > 0:287 return 'YES'288 else:289 return 'NO'290def _enable_pf():291 exec_cmd('{pfctl} -e'.format(pfctl=PFCTL))292def _reset_bitmask_gateways_table(gateways):293 cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T delete'.format(294 pfctl=PFCTL, anchor=BITMASK_ANCHOR)295 exec_cmd(cmd)296 for gateway in gateways:297 cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T add {gw}'.format(298 pfctl=PFCTL, anchor=BITMASK_ANCHOR, gw=gateway)299 exec_cmd(cmd)300 syslog.syslog(syslog.LOG_WARNING, "adding gw %s" % gateway)301 # cmd = '{pfctl} -a {anchor} -t bitmask_nameservers -T delete'.format(302 # pfctl=PFCTL, anchor=BITMASK_ANCHOR)303 # output = exec_cmd(cmd)304 cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T add {ns}'.format(305 pfctl=PFCTL, anchor=BITMASK_ANCHOR, ns=NAMESERVER)306 exec_cmd(cmd)307 syslog.syslog(syslog.LOG_WARNING, "adding ns %s" % NAMESERVER)308def _load_bitmask_anchor(default_device):309 cmd = ('{pfctl} -D default_device={defaultdevice} '310 '-a {anchor} -f {rulefile}').format(311 pfctl=PFCTL, defaultdevice=default_device,312 anchor=BITMASK_ANCHOR,313 rulefile=RESOURCES_PATH + 'bitmask-helper/bitmask.pf.conf')314 syslog.syslog(syslog.LOG_WARNING, "LOADING CMD: %s" % cmd)315 return exec_cmd(cmd)316def _get_default_device():317 """318 Retrieve the current default network device.319 :rtype: str320 """321 cmd_def_device = (322 '{route} -n get -net default | '323 '{grep} interface | {awk} "{{print $2}}"').format(324 route=ROUTE, grep=GREP, awk=AWK)325 iface = exec_cmd(cmd_def_device)326 iface = iface.replace("interface: ", "").strip()327 syslog.syslog(syslog.LOG_WARNING, "default device %s" % iface)328 return iface329#330# UTILITY331#332def is_valid_address(value):333 """334 Validate that the passed ip is a valid IP address.335 :param value: the value to be validated336 :type value: str337 :rtype: bool338 """339 try:340 socket.inet_aton(value)341 return True342 except Exception:343 syslog.syslog(syslog.LOG_WARNING, 'MALFORMED IP: %s!' % (value))344 return False345#346# FIREWALL347#348def get_gateways(gateways):349 """350 Filter a passed sequence of gateways, returning only the valid ones.351 :param gateways: a sequence of gateways to filter.352 :type gateways: iterable353 :rtype: iterable354 """355 syslog.syslog(syslog.LOG_WARNING, 'Filtering %s' % str(gateways))356 result = filter(is_valid_address, gateways)357 if not result:358 syslog.syslog(syslog.LOG_ERR, 'No valid gateways specified')359 return False360 else:361 return result362if __name__ == "__main__":363 with daemon.DaemonContext():364 syslog.syslog(syslog.LOG_WARNING, "Serving...")...

Full Screen

Full Screen

tails-additional-software

Source:tails-additional-software Github

copy

Full Screen

1#!/usr/bin/env python2import gettext3import glob4import os.path5import pwd6import subprocess7import sys8import syslog9_ = gettext.gettext10PERSISTENCE_DIR = "/live/persistence/TailsData_unlocked"11PACKAGES_LIST_FILE = PERSISTENCE_DIR + "/live-additional-software.conf"12ACTIVATION_FILE = "/run/live-additional-software/activated"13APT_ARCHIVES_DIR = "/var/cache/apt/archives"14APT_LISTS_DIR = "/var/lib/apt/lists"15OBSOLETE_APT_LIST_SUFFIX = "_binary-i386_Packages"16def _launch_apt_get(specific_args):17 """Launch apt-get with given args18 Launch apt-get with given arguments list, log its standard and error output19 and return its returncode"""20 apt_get_env = os.environ.copy()21 # The environnment provided in GDM PostLogin hooks doesn't contain /sbin/22 # which is required by dpkg. Let's use the default path for root in Tails.23 apt_get_env['PATH'] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:" \24 "/usr/bin:/sbin:/bin"25 # We will log the output and want it in English when included in bug26 # reports27 apt_get_env['LANG'] = "C"28 args = ["apt-get", "--quiet", "--yes"]29 args.extend(specific_args)30 apt_get = subprocess.Popen(31 args,32 env=apt_get_env,33 stderr=subprocess.STDOUT,34 stdout=subprocess.PIPE)35 for line in iter(apt_get.stdout.readline, ''):36 if not line.startswith('('):37 syslog.syslog(line.rstrip())38 apt_get.wait()39 if apt_get.returncode:40 syslog.syslog(syslog.LOG_WARNING,41 "apt-get exited with returncode %i" % apt_get.returncode)42 return apt_get.returncode43def _notify(title, body):44 """Display a notification to the user of the live system45 """46 cmd = "/usr/local/sbin/tails-notify-user"47 try:48 notify_user_output = subprocess.check_output([cmd, title, body],49 stderr=subprocess.STDOUT)50 except subprocess.CalledProcessError as e:51 syslog.syslog(syslog.LOG_WARNING,52 "Warning: unable to notify the user. %s returned "53 "with exit code %s" % (cmd, e.returncode))54 syslog.syslog(syslog.LOG_WARNING,55 "%s output follows: %s." % (cmd, notify_user_output))56 syslog.syslog(syslog.LOG_WARNING,57 "The notification was: %s %s" % (title, body))58 except OSError as e:59 syslog.syslog(syslog.LOG_WARNING,60 "Warning: unable to notify the user. %s" % e)61 syslog.syslog(syslog.LOG_WARNING,62 "The notification was: %s %s" % (title, body))63def has_additional_packages_list():64 """Return true iff PACKAGES_LIST_FILE exists65 """66 return os.path.isfile(PACKAGES_LIST_FILE)67def get_additional_packages():68 """Returns the list of all the additional packages69 """70 packages = []71 if has_additional_packages_list():72 with open(PACKAGES_LIST_FILE) as f:73 for line in f:74 line = line.strip()75 if line:76 packages.append(line)77 f.closed78 return packages79def install_additional_packages():80 """The subcommand which activates and installs all additional packages81 """82 syslog.syslog("Starting to install additional software...")83 if has_additional_packages_list():84 syslog.syslog("Found additional packages list")85 elif os.path.isdir(PERSISTENCE_DIR):86 syslog.syslog(syslog.LOG_WARNING,87 "Warning: no configuration file found, creating an "88 "empty one.")89 create_additional_packages_list()90 return True91 else:92 syslog.syslog(syslog.LOG_WARNING,93 "Warning: persistence is not mounted, exiting")94 return True95 try:96 clear_obsolete_cache(OBSOLETE_APT_LIST_SUFFIX)97 except:98 syslog.syslog(syslog.LOG_WARNING,99 "Warning: failed to clear obsolete cached packages")100 packages = get_additional_packages()101 if not packages:102 syslog.syslog(syslog.LOG_WARNING,103 "Warning: no packages to install, exiting")104 return True105 set_activated()106 syslog.syslog("Will install the following packages: %s"107 % " ".join(packages))108 apt_get_returncode = _launch_apt_get(109 ["--no-remove",110 "--option", "DPkg::Options::=--force-confold",111 "install"] + packages)112 if apt_get_returncode:113 syslog.syslog(syslog.LOG_WARNING,114 "Warning: installation of %s failed"115 % " ".join(packages))116 return False117 else:118 syslog.syslog("Installation completed successfully.")119 return True120def upgrade_additional_packages():121 """The subcommand which upgrades all additional packages if they are activated122 """123 if not is_activated():124 syslog.syslog(syslog.LOG_WARNING,125 "Warning: additional packages not activated, exiting")126 return True127 syslog.syslog("Starting to upgrade additional software...")128 apt_get_returncode = _launch_apt_get(["update"])129 if apt_get_returncode:130 syslog.syslog(syslog.LOG_WARNING, "Warning: the update failed.")131 _notify(_("Your additional software"),132 _("The upgrade failed. This might be due to a network "133 "problem. Please check your network connection, try to "134 "restart Tails, or read the system log to understand better "135 "the problem."))136 return False137 # Remove outdated packages from the local package cache. This is needed as138 # we disable apt-daily.timer, which would else take care of this cleanup.139 # Note: this does not remove packages from other architectures, hence140 # the need for the clear_obsolete_cache() function.141 apt_get_returncode = _launch_apt_get(["autoclean"])142 if apt_get_returncode:143 syslog.syslog(syslog.LOG_WARNING,144 "Warning: autoclean failed, proceeding anyway.")145 if install_additional_packages():146 _notify(_("Your additional software"),147 _("The upgrade was successful."))148 return True149 else:150 _notify(_("Your additional software"),151 _("The upgrade failed. This might be due to a network "152 "problem. Please check your network connection, try to "153 "restart Tails, or read the system log to understand better "154 "the problem."))155 return False156def create_additional_packages_list():157 """Creates the additional packages list158 Creates the additional packages list file with the right permissions.159 The caller must ensure the file doesn't already exist.160 """161 assert not has_additional_packages_list(), "%s already exists" \162 % PACKAGES_LIST_FILE163 syslog.syslog("Creating additional software configuration file")164 f = open(PACKAGES_LIST_FILE, 'w')165 f.closed166 os.chmod(PACKAGES_LIST_FILE, 0o600)167 os.chown(PACKAGES_LIST_FILE,168 pwd.getpwnam('tails-persistence-setup').pw_uid,169 pwd.getpwnam('tails-persistence-setup').pw_gid)170def clear_obsolete_cache(obsolete_apt_list_suffix):171 """Delete cached packages when a package list's filename172 ends with obsolete_apt_list_suffix173 """174 if glob.glob(APT_LISTS_DIR + "/*" + obsolete_apt_list_suffix):175 syslog.syslog("Clearing cache because at least one package list "176 "has suffix '%s'." % obsolete_apt_list_suffix)177 for deb in glob.glob(APT_ARCHIVES_DIR + "/*.deb"):178 os.remove(deb)179def is_activated():180 """Check if additional software has been activated181 """182 return os.path.isfile(ACTIVATION_FILE)183def set_activated():184 """Save that additional software has been activated185 """186 syslog.syslog("Activating persistent software packages")187 activation_file_dir = os.path.dirname(ACTIVATION_FILE)188 if not os.path.exists(activation_file_dir):189 os.makedirs(activation_file_dir)190 try:191 f = open(ACTIVATION_FILE, 'w')192 finally:193 if f:194 f.close()195def print_help():196 """The subcommand which displays help197 """198 sys.stderr.write("Usage: %s <subcommand>\n" % program_name)199 sys.stderr.write("""Subcommands:200 install: activate and install additional software201 upgrade: upgrade additional software if activated\n""")202if __name__ == "__main__":203 program_name = os.path.basename(sys.argv[0])204 syslog.openlog("%s[%i]" % (program_name, os.getpid()))205 gettext.install("tails")206 if len(sys.argv) < 2:207 print_help()208 sys.exit(4)209 if sys.argv[1] == "install":210 if not install_additional_packages():211 sys.exit(1)212 elif sys.argv[1] == "upgrade":213 if not upgrade_additional_packages():214 sys.exit(2)215 else:216 print_help()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful