Best Python code snippet using autotest_python
test_thermalctld.py
Source:test_thermalctld.py  
1import os2import sys3import multiprocessing4from imp import load_source  # TODO: Replace with importlib once we no longer need to support Python 25# TODO: Clean this up once we no longer need to support Python 26if sys.version_info.major == 3:7    from unittest import mock8else:9    import mock10import pytest11tests_path = os.path.dirname(os.path.abspath(__file__))12# Add mocked_libs path so that the file under test can load mocked modules from there13mocked_libs_path = os.path.join(tests_path, 'mocked_libs')14sys.path.insert(0, mocked_libs_path)15import swsscommon16# Check we are using the mocked package17assert len(swsscommon.__path__) == 118assert(os.path.samefile(swsscommon.__path__[0], os.path.join(mocked_libs_path, 'swsscommon')))19from sonic_py_common import daemon_base20from .mock_platform import MockChassis, MockFan, MockPsu, MockSfp, MockThermal21daemon_base.db_connect = mock.MagicMock()22# Add path to the file under test so that we can load it23modules_path = os.path.dirname(tests_path)24scripts_path = os.path.join(modules_path, 'scripts')25sys.path.insert(0, modules_path)26load_source('thermalctld', os.path.join(scripts_path, 'thermalctld'))27import thermalctld28TEMPER_INFO_TABLE_NAME = 'TEMPERATURE_INFO'29@pytest.fixture(scope='function', autouse=True)30def configure_mocks():31    thermalctld.FanStatus.log_notice = mock.MagicMock()32    thermalctld.FanStatus.log_warning = mock.MagicMock()33    thermalctld.FanUpdater.log_notice = mock.MagicMock()34    thermalctld.FanUpdater.log_warning = mock.MagicMock()35    thermalctld.TemperatureStatus.log_notice = mock.MagicMock()36    thermalctld.TemperatureStatus.log_warning = mock.MagicMock()37    thermalctld.TemperatureUpdater.log_notice = mock.MagicMock()38    thermalctld.TemperatureUpdater.log_warning = mock.MagicMock()39    yield40    thermalctld.FanStatus.log_notice.reset()41    thermalctld.FanStatus.log_warning.reset()42    thermalctld.FanUpdater.log_notice.reset()43    thermalctld.FanUpdater.log_notice.reset()44    thermalctld.TemperatureStatus.log_notice.reset()45    thermalctld.TemperatureStatus.log_warning.reset()46    thermalctld.TemperatureUpdater.log_warning.reset()47    thermalctld.TemperatureUpdater.log_warning.reset()48class TestFanStatus(object):49    """50    Test cases to cover functionality in FanStatus class51    """52    def test_check_speed_value_available(self):53        fan_status = thermalctld.FanStatus()54        ret = fan_status._check_speed_value_available(30, 32, 5, True)55        assert ret == True56        assert fan_status.log_warning.call_count == 057        ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 105, True)58        assert ret == False59        assert fan_status.log_warning.call_count == 160        fan_status.log_warning.assert_called_with('Invalid tolerance value: 105')61        # Reset62        fan_status.log_warning.reset_mock()63        ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 5, False)64        assert ret == False65        assert fan_status.log_warning.call_count == 066        ret = fan_status._check_speed_value_available(thermalctld.NOT_AVAILABLE, 32, 5, True)67        assert ret == False68        assert fan_status.log_warning.call_count == 169        fan_status.log_warning.assert_called_with('Fan speed or target_speed or tolerance became unavailable, speed=N/A, target_speed=32, tolerance=5')70    def test_set_presence(self):71        fan_status = thermalctld.FanStatus()72        ret = fan_status.set_presence(True)73        assert fan_status.presence74        assert not ret75        ret = fan_status.set_presence(False)76        assert not fan_status.presence77        assert ret78    def test_set_under_speed(self):79        fan_status = thermalctld.FanStatus()80        ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)81        assert not ret82        ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, 0)83        assert not ret84        ret = fan_status.set_under_speed(thermalctld.NOT_AVAILABLE, 0, 0)85        assert not ret86        ret = fan_status.set_under_speed(0, 0, 0)87        assert not ret88        ret = fan_status.set_under_speed(80, 100, 19)89        assert ret90        assert fan_status.under_speed91        assert not fan_status.is_ok()92        ret = fan_status.set_under_speed(81, 100, 19)93        assert ret94        assert not fan_status.under_speed95        assert fan_status.is_ok()96    def test_set_over_speed(self):97        fan_status = thermalctld.FanStatus()98        ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)99        assert not ret100        ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE, 0)101        assert not ret102        ret = fan_status.set_over_speed(thermalctld.NOT_AVAILABLE, 0, 0)103        assert not ret104        ret = fan_status.set_over_speed(0, 0, 0)105        assert not ret106        ret = fan_status.set_over_speed(120, 100, 19)107        assert ret108        assert fan_status.over_speed109        assert not fan_status.is_ok()110        ret = fan_status.set_over_speed(120, 100, 21)111        assert ret112        assert not fan_status.over_speed113        assert fan_status.is_ok()114class TestFanUpdater(object):115    """116    Test cases to cover functionality in FanUpdater class117    """118    @mock.patch('thermalctld.try_get', mock.MagicMock(return_value=thermalctld.NOT_AVAILABLE))119    @mock.patch('thermalctld.update_entity_info', mock.MagicMock())120    def test_refresh_fan_drawer_status_fan_drawer_get_name_not_impl(self):121        # Test case where fan_drawer.get_name is not implemented122        fan_updater = thermalctld.FanUpdater(MockChassis(), multiprocessing.Event())123        mock_fan_drawer = mock.MagicMock()124        fan_updater._refresh_fan_drawer_status(mock_fan_drawer, 1)125        assert thermalctld.update_entity_info.call_count == 0126    # TODO: Add a test case for _refresh_fan_drawer_status with a good fan drawer127    def test_update_fan_with_exception(self):128        chassis = MockChassis()129        chassis.make_error_fan()130        fan = MockFan()131        fan.make_over_speed()132        chassis.get_all_fans().append(fan)133        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())134        fan_updater.update()135        assert fan.get_status_led() == MockFan.STATUS_LED_COLOR_RED136        assert fan_updater.log_warning.call_count == 1137        # TODO: Clean this up once we no longer need to support Python 2138        if sys.version_info.major == 3:139            fan_updater.log_warning.assert_called_with("Failed to update fan status - Exception('Failed to get speed')")140        else:141            fan_updater.log_warning.assert_called_with("Failed to update fan status - Exception('Failed to get speed',)")142    def test_set_fan_led_exception(self):143        fan_status = thermalctld.FanStatus()144        mock_fan_drawer = mock.MagicMock()145        mock_fan = MockFan()146        mock_fan.set_status_led = mock.MagicMock(side_effect=NotImplementedError)147        fan_updater = thermalctld.FanUpdater(MockChassis(), multiprocessing.Event())148        fan_updater._set_fan_led(mock_fan_drawer, mock_fan, 'Test Fan', fan_status)149        assert fan_updater.log_warning.call_count == 1150        fan_updater.log_warning.assert_called_with('Failed to set status LED for fan Test Fan, set_status_led not implemented')151    def test_fan_absent(self):152        chassis = MockChassis()153        chassis.make_absent_fan()154        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())155        fan_updater.update()156        fan_list = chassis.get_all_fans()157        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED158        assert fan_updater.log_warning.call_count == 2159        expected_calls = [160            mock.call('Fan removed warning: FanDrawer 0 fan 1 was removed from the system, potential overheat hazard'),161            mock.call('Insufficient number of working fans warning: 1 fan is not working')162        ]163        assert fan_updater.log_warning.mock_calls == expected_calls164        fan_list[0].set_presence(True)165        fan_updater.update()166        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN167        assert fan_updater.log_notice.call_count == 2168        expected_calls = [169            mock.call('Fan removed warning cleared: FanDrawer 0 fan 1 was inserted'),170            mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')171        ]172        assert fan_updater.log_notice.mock_calls == expected_calls173    def test_fan_faulty(self):174        chassis = MockChassis()175        chassis.make_faulty_fan()176        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())177        fan_updater.update()178        fan_list = chassis.get_all_fans()179        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED180        assert fan_updater.log_warning.call_count == 2181        expected_calls = [182            mock.call('Fan fault warning: FanDrawer 0 fan 1 is broken'),183            mock.call('Insufficient number of working fans warning: 1 fan is not working')184        ]185        assert fan_updater.log_warning.mock_calls == expected_calls186        fan_list[0].set_status(True)187        fan_updater.update()188        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN189        assert fan_updater.log_notice.call_count == 2190        expected_calls = [191            mock.call('Fan fault warning cleared: FanDrawer 0 fan 1 is back to normal'),192            mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')193        ]194        assert fan_updater.log_notice.mock_calls == expected_calls195    def test_fan_under_speed(self):196        chassis = MockChassis()197        chassis.make_under_speed_fan()198        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())199        fan_updater.update()200        fan_list = chassis.get_all_fans()201        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED202        assert fan_updater.log_warning.call_count == 1203        fan_updater.log_warning.assert_called_with('Fan low speed warning: FanDrawer 0 fan 1 current speed=1, target speed=2, tolerance=0')204        fan_list[0].make_normal_speed()205        fan_updater.update()206        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN207        assert fan_updater.log_notice.call_count == 1208        fan_updater.log_notice.assert_called_with('Fan low speed warning cleared: FanDrawer 0 fan 1 speed is back to normal')209    def test_fan_over_speed(self):210        chassis = MockChassis()211        chassis.make_over_speed_fan()212        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())213        fan_updater.update()214        fan_list = chassis.get_all_fans()215        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_RED216        assert fan_updater.log_warning.call_count == 1217        fan_updater.log_warning.assert_called_with('Fan high speed warning: FanDrawer 0 fan 1 target speed=1, current speed=2, tolerance=0')218        fan_list[0].make_normal_speed()219        fan_updater.update()220        assert fan_list[0].get_status_led() == MockFan.STATUS_LED_COLOR_GREEN221        assert fan_updater.log_notice.call_count == 1222        fan_updater.log_notice.assert_called_with('Fan high speed warning cleared: FanDrawer 0 fan 1 speed is back to normal')223    def test_update_psu_fans(self):224        chassis = MockChassis()225        psu = MockPsu()226        mock_fan = MockFan()227        psu._fan_list.append(mock_fan)228        chassis._psu_list.append(psu)229        fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())230        fan_updater.update()231        assert fan_updater.log_warning.call_count == 0232        fan_updater._refresh_fan_status = mock.MagicMock(side_effect=Exception("Test message"))233        fan_updater.update()234        assert fan_updater.log_warning.call_count == 1235        # TODO: Clean this up once we no longer need to support Python 2236        if sys.version_info.major == 3:237            fan_updater.log_warning.assert_called_with("Failed to update PSU fan status - Exception('Test message')")238        else:239            fan_updater.log_warning.assert_called_with("Failed to update PSU fan status - Exception('Test message',)")240class TestThermalMonitor(object):241    """242    Test cases to cover functionality in ThermalMonitor class243    """244    def test_main(self):245        mock_chassis = MockChassis()246        thermal_monitor = thermalctld.ThermalMonitor(mock_chassis)247        thermal_monitor.fan_updater.update = mock.MagicMock()248        thermal_monitor.temperature_updater.update = mock.MagicMock()249        thermal_monitor.main()250        assert thermal_monitor.fan_updater.update.call_count == 1251        assert thermal_monitor.temperature_updater.update.call_count == 1252def test_insufficient_fan_number():253    fan_status1 = thermalctld.FanStatus()254    fan_status2 = thermalctld.FanStatus()255    fan_status1.set_presence(False)256    fan_status2.set_fault_status(False)257    assert thermalctld.FanStatus.get_bad_fan_count() == 2258    assert fan_status1.get_bad_fan_count() == 2259    assert fan_status2.get_bad_fan_count() == 2260    thermalctld.FanStatus.reset_fan_counter()261    assert thermalctld.FanStatus.get_bad_fan_count() == 0262    assert fan_status1.get_bad_fan_count() == 0263    assert fan_status2.get_bad_fan_count() == 0264    chassis = MockChassis()265    chassis.make_absent_fan()266    chassis.make_faulty_fan()267    fan_updater = thermalctld.FanUpdater(chassis, multiprocessing.Event())268    fan_updater.update()269    assert fan_updater.log_warning.call_count == 3270    expected_calls = [271        mock.call('Fan removed warning: FanDrawer 0 fan 1 was removed from the system, potential overheat hazard'),272        mock.call('Fan fault warning: FanDrawer 1 fan 1 is broken'),273        mock.call('Insufficient number of working fans warning: 2 fans are not working')274    ]275    assert fan_updater.log_warning.mock_calls == expected_calls276    fan_list = chassis.get_all_fans()277    fan_list[0].set_presence(True)278    fan_updater.update()279    assert fan_updater.log_notice.call_count == 1280    fan_updater.log_warning.assert_called_with('Insufficient number of working fans warning: 1 fan is not working')281    fan_list[1].set_status(True)282    fan_updater.update()283    assert fan_updater.log_notice.call_count == 3284    expected_calls = [285            mock.call('Fan removed warning cleared: FanDrawer 0 fan 1 was inserted'),286            mock.call('Fan fault warning cleared: FanDrawer 1 fan 1 is back to normal'),287        mock.call('Insufficient number of working fans warning cleared: all fans are back to normal')288    ]289    assert fan_updater.log_notice.mock_calls == expected_calls290def test_temperature_status_set_over_temper():291    temperature_status = thermalctld.TemperatureStatus()292    ret = temperature_status.set_over_temperature(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)293    assert not ret294    ret = temperature_status.set_over_temperature(thermalctld.NOT_AVAILABLE, 0)295    assert not ret296    ret = temperature_status.set_over_temperature(0, thermalctld.NOT_AVAILABLE)297    assert not ret298    ret = temperature_status.set_over_temperature(2, 1)299    assert ret300    assert temperature_status.over_temperature301    ret = temperature_status.set_over_temperature(1, 2)302    assert ret303    assert not temperature_status.over_temperature304def test_temperstatus_set_under_temper():305    temperature_status = thermalctld.TemperatureStatus()306    ret = temperature_status.set_under_temperature(thermalctld.NOT_AVAILABLE, thermalctld.NOT_AVAILABLE)307    assert not ret308    ret = temperature_status.set_under_temperature(thermalctld.NOT_AVAILABLE, 0)309    assert not ret310    ret = temperature_status.set_under_temperature(0, thermalctld.NOT_AVAILABLE)311    assert not ret312    ret = temperature_status.set_under_temperature(1, 2)313    assert ret314    assert temperature_status.under_temperature315    ret = temperature_status.set_under_temperature(2, 1)316    assert ret317    assert not temperature_status.under_temperature318def test_temperature_status_set_not_available():319    THERMAL_NAME = 'Chassis 1 Thermal 1'320    temperature_status = thermalctld.TemperatureStatus()321    temperature_status.temperature = 20.0322    temperature_status.set_temperature(THERMAL_NAME, thermalctld.NOT_AVAILABLE)323    assert temperature_status.temperature is None324    assert temperature_status.log_warning.call_count == 1325    temperature_status.log_warning.assert_called_with('Temperature of {} became unavailable'.format(THERMAL_NAME))326class TestTemperatureUpdater(object):327    """328    Test cases to cover functionality in TemperatureUpdater class329    """330    def test_deinit(self):331        chassis = MockChassis()332        temp_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())333        temp_updater.temperature_status_dict = {'key1': 'value1', 'key2': 'value2'}334        temp_updater.table._del = mock.MagicMock()335        temp_updater.deinit()336        assert temp_updater.table._del.call_count == 2337        expected_calls = [mock.call('key1'), mock.call('key2')]338        temp_updater.table._del.assert_has_calls(expected_calls, any_order=True)339        340    def test_over_temper(self):341        chassis = MockChassis()342        chassis.make_over_temper_thermal()343        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())344        temperature_updater.update()345        thermal_list = chassis.get_all_thermals()346        assert temperature_updater.log_warning.call_count == 1347        temperature_updater.log_warning.assert_called_with('High temperature warning: chassis 1 Thermal 1 current temperature 3C, high threshold 2C')348        thermal_list[0].make_normal_temper()349        temperature_updater.update()350        assert temperature_updater.log_notice.call_count == 1351        temperature_updater.log_notice.assert_called_with('High temperature warning cleared: chassis 1 Thermal 1 temperature restored to 2C, high threshold 3C')352    def test_under_temper(self):353        chassis = MockChassis()354        chassis.make_under_temper_thermal()355        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())356        temperature_updater.update()357        thermal_list = chassis.get_all_thermals()358        assert temperature_updater.log_warning.call_count == 1359        temperature_updater.log_warning.assert_called_with('Low temperature warning: chassis 1 Thermal 1 current temperature 1C, low threshold 2C')360        thermal_list[0].make_normal_temper()361        temperature_updater.update()362        assert temperature_updater.log_notice.call_count == 1363        temperature_updater.log_notice.assert_called_with('Low temperature warning cleared: chassis 1 Thermal 1 temperature restored to 2C, low threshold 1C')364    def test_update_psu_thermals(self):365        chassis = MockChassis()366        psu = MockPsu()367        mock_thermal = MockThermal()368        psu._thermal_list.append(mock_thermal)369        chassis._psu_list.append(psu)370        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())371        temperature_updater.update()372        assert temperature_updater.log_warning.call_count == 0373        mock_thermal.get_temperature = mock.MagicMock(side_effect=Exception("Test message"))374        temperature_updater.update()375        assert temperature_updater.log_warning.call_count == 1376        # TODO: Clean this up once we no longer need to support Python 2377        if sys.version_info.major == 3:378            temperature_updater.log_warning.assert_called_with("Failed to update thermal status for PSU 1 Thermal 1 - Exception('Test message')")379        else:380            temperature_updater.log_warning.assert_called_with("Failed to update thermal status for PSU 1 Thermal 1 - Exception('Test message',)")381    def test_update_sfp_thermals(self):382        chassis = MockChassis()383        sfp = MockSfp()384        mock_thermal = MockThermal()385        sfp._thermal_list.append(mock_thermal)386        chassis._sfp_list.append(sfp)387        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())388        temperature_updater.update()389        assert temperature_updater.log_warning.call_count == 0390        mock_thermal.get_temperature = mock.MagicMock(side_effect=Exception("Test message"))391        temperature_updater.update()392        assert temperature_updater.log_warning.call_count == 1393        # TODO: Clean this up once we no longer need to support Python 2394        if sys.version_info.major == 3:395            temperature_updater.log_warning.assert_called_with("Failed to update thermal status for SFP 1 Thermal 1 - Exception('Test message')")396        else:397            temperature_updater.log_warning.assert_called_with("Failed to update thermal status for SFP 1 Thermal 1 - Exception('Test message',)")398    def test_update_thermal_with_exception(self):399        chassis = MockChassis()400        chassis.make_error_thermal()401        thermal = MockThermal()402        thermal.make_over_temper()403        chassis.get_all_thermals().append(thermal)404        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())405        temperature_updater.update()406        assert temperature_updater.log_warning.call_count == 2407        # TODO: Clean this up once we no longer need to support Python 2408        if sys.version_info.major == 3:409            expected_calls = [410                mock.call("Failed to update thermal status for chassis 1 Thermal 1 - Exception('Failed to get temperature')"),411                mock.call('High temperature warning: chassis 1 Thermal 2 current temperature 3C, high threshold 2C')412            ]413        else:414            expected_calls = [415                mock.call("Failed to update thermal status for chassis 1 Thermal 1 - Exception('Failed to get temperature',)"),416                mock.call('High temperature warning: chassis 1 Thermal 2 current temperature 3C, high threshold 2C')417            ]418        assert temperature_updater.log_warning.mock_calls == expected_calls419    def test_update_module_thermals(self):420        chassis = MockChassis()421        chassis.make_module_thermal()422        chassis.set_modular_chassis(True)423        temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())424        temperature_updater.update()425        assert len(temperature_updater.module_thermals) == 3426        427        chassis._module_list = []428        temperature_updater.update()429        assert len(temperature_updater.module_thermals) == 0430# Modular chassis-related tests431def test_updater_thermal_check_modular_chassis():432    chassis = MockChassis()433    assert chassis.is_modular_chassis() == False434    temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())435    assert temperature_updater.chassis_table == None436    chassis.set_modular_chassis(True)437    chassis.set_my_slot(-1)438    temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())439    assert temperature_updater.chassis_table == None440    my_slot = 1441    chassis.set_my_slot(my_slot)442    temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())443    assert temperature_updater.chassis_table != None444    assert temperature_updater.chassis_table.table_name == '{}_{}'.format(TEMPER_INFO_TABLE_NAME, str(my_slot))445def test_updater_thermal_check_chassis_table():446    chassis = MockChassis()447    thermal1 = MockThermal()448    chassis.get_all_thermals().append(thermal1)449    chassis.set_modular_chassis(True)450    chassis.set_my_slot(1)451    temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())452    temperature_updater.update()453    assert temperature_updater.chassis_table.get_size() == chassis.get_num_thermals()454    thermal2 = MockThermal()455    chassis.get_all_thermals().append(thermal2)456    temperature_updater.update()457    assert temperature_updater.chassis_table.get_size() == chassis.get_num_thermals()458def test_updater_thermal_check_min_max():459    chassis = MockChassis()460    thermal = MockThermal(1)461    chassis.get_all_thermals().append(thermal)462    chassis.set_modular_chassis(True)463    chassis.set_my_slot(1)464    temperature_updater = thermalctld.TemperatureUpdater(chassis, multiprocessing.Event())465    temperature_updater.update()466    slot_dict = temperature_updater.chassis_table.get(thermal.get_name())467    assert slot_dict['minimum_temperature'] == str(thermal.get_minimum_recorded())468    assert slot_dict['maximum_temperature'] == str(thermal.get_maximum_recorded())469def test_signal_handler():470    # Test SIGHUP471    daemon_thermalctld = thermalctld.ThermalControlDaemon()472    daemon_thermalctld.stop_event.set = mock.MagicMock()473    daemon_thermalctld.log_info = mock.MagicMock()474    daemon_thermalctld.log_warning = mock.MagicMock()475    daemon_thermalctld.thermal_manager.stop = mock.MagicMock()476    daemon_thermalctld.signal_handler(thermalctld.signal.SIGHUP, None)477    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert478    assert daemon_thermalctld.log_info.call_count == 1479    daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGHUP' - ignoring...")480    assert daemon_thermalctld.log_warning.call_count == 0481    assert daemon_thermalctld.stop_event.set.call_count == 0482    assert daemon_thermalctld.thermal_manager.stop.call_count == 0483    assert thermalctld.exit_code == thermalctld.ERR_UNKNOWN484    # Test SIGINT485    daemon_thermalctld = thermalctld.ThermalControlDaemon()486    daemon_thermalctld.stop_event.set = mock.MagicMock()487    daemon_thermalctld.log_info = mock.MagicMock()488    daemon_thermalctld.log_warning = mock.MagicMock()489    daemon_thermalctld.thermal_manager.stop = mock.MagicMock()490    test_signal = thermalctld.signal.SIGINT491    daemon_thermalctld.signal_handler(test_signal, None)492    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert493    assert daemon_thermalctld.log_info.call_count == 1494    daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGINT' - exiting...")495    assert daemon_thermalctld.log_warning.call_count == 0496    assert daemon_thermalctld.stop_event.set.call_count == 1497    assert daemon_thermalctld.thermal_manager.stop.call_count == 1498    assert thermalctld.exit_code == (128 + test_signal)499    # Test SIGTERM500    thermalctld.exit_code = thermalctld.ERR_UNKNOWN501    daemon_thermalctld = thermalctld.ThermalControlDaemon()502    daemon_thermalctld.stop_event.set = mock.MagicMock()503    daemon_thermalctld.log_info = mock.MagicMock()504    daemon_thermalctld.log_warning = mock.MagicMock()505    daemon_thermalctld.thermal_manager.stop = mock.MagicMock()506    test_signal = thermalctld.signal.SIGTERM507    daemon_thermalctld.signal_handler(test_signal, None)508    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert509    assert daemon_thermalctld.log_info.call_count == 1510    daemon_thermalctld.log_info.assert_called_with("Caught signal 'SIGTERM' - exiting...")511    assert daemon_thermalctld.log_warning.call_count == 0512    assert daemon_thermalctld.stop_event.set.call_count == 1513    assert daemon_thermalctld.thermal_manager.stop.call_count == 1514    assert thermalctld.exit_code == (128 + test_signal)515    # Test an unhandled signal516    thermalctld.exit_code = thermalctld.ERR_UNKNOWN517    daemon_thermalctld = thermalctld.ThermalControlDaemon()518    daemon_thermalctld.stop_event.set = mock.MagicMock()519    daemon_thermalctld.log_info = mock.MagicMock()520    daemon_thermalctld.log_warning = mock.MagicMock()521    daemon_thermalctld.thermal_manager.stop = mock.MagicMock()522    daemon_thermalctld.signal_handler(thermalctld.signal.SIGUSR1, None)523    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert524    assert daemon_thermalctld.log_warning.call_count == 1525    daemon_thermalctld.log_warning.assert_called_with("Caught unhandled signal 'SIGUSR1' - ignoring...")526    assert daemon_thermalctld.log_info.call_count == 0527    assert daemon_thermalctld.stop_event.set.call_count == 0528    assert daemon_thermalctld.thermal_manager.stop.call_count == 0529    assert thermalctld.exit_code == thermalctld.ERR_UNKNOWN530def test_daemon_run():531    daemon_thermalctld = thermalctld.ThermalControlDaemon()532    daemon_thermalctld.stop_event.wait = mock.MagicMock(return_value=True)533    daemon_thermalctld.thermal_manager.get_interval = mock.MagicMock(return_value=60)534    ret = daemon_thermalctld.run()535    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert536    assert ret is False537    daemon_thermalctld = thermalctld.ThermalControlDaemon()538    daemon_thermalctld.stop_event.wait = mock.MagicMock(return_value=False)539    daemon_thermalctld.thermal_manager.get_interval = mock.MagicMock(return_value=60)540    ret = daemon_thermalctld.run()541    daemon_thermalctld.deinit() # Deinit becuase the test will hang if we assert542    assert ret is True543def test_try_get():544    def good_callback():545        return 'good result'546    def unimplemented_callback():547        raise NotImplementedError548    ret = thermalctld.try_get(good_callback)549    assert ret == 'good result'550    ret = thermalctld.try_get(unimplemented_callback)551    assert ret == thermalctld.NOT_AVAILABLE552    ret = thermalctld.try_get(unimplemented_callback, 'my default')553    assert ret == 'my default'554def test_update_entity_info():555    mock_table = mock.MagicMock()556    mock_fan = MockFan()557    expected_fvp = thermalctld.swsscommon.FieldValuePairs(558        [('position_in_parent', '1'),559         ('parent_name', 'Parent Name')560         ])561    thermalctld.update_entity_info(mock_table, 'Parent Name', 'Key Name', mock_fan, 1)562    assert mock_table.set.call_count == 1563    mock_table.set.assert_called_with('Key Name', expected_fvp)564@mock.patch('thermalctld.ThermalControlDaemon.run')565def test_main(mock_run):566    mock_run.return_value = False567    ret = thermalctld.main()568    assert mock_run.call_count == 1...mysql_conn.py
Source:mysql_conn.py  
1import re2import json3import time4import MySQLdb5import type_check6from DBUtils.PooledDB import PooledDB7from loggingex import LOG_WARNING8from loggingex import LOG_INFO9from loggingex import LOG_ERROR_SQL10class mysql_conn():11    def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):12        self._host = host_name13        self._port = int(port_num)14        self._user = user_name15        self._passwd = password16        self._db = db_name17        self._charset = charset_name18        self._pool = None19        self._table_info = {}20        self.re_connect()21    def __del__(self):22        self._try_close_connect()23    def re_connect(self):24        self._try_close_connect()25        26        try:27            self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)28            LOG_INFO("connect %s success" %(self._db))29            self.refresh_tables_info()30            return31        except MySQLdb.Error, e :32            if e.args[0] == 1049:33                self._create_db()34            else:35                LOG_WARNING("%s connect error %s" % (self._db, str(e)))36                return37        except Exception as e:38            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))39            return 40        try:41            self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)42            pool = PersistentDB(creator=MySQLdb, mincached=1, maxcached=3, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)43            LOG_INFO("connect %s success" %(self._db))44            self.refresh_tables_info()45            return46        except Exception as e:47            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))48            return49        50        if None == self._pool:51            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))52            return53    54    def _try_close_connect(self):55        if None == self._pool:56            return57        try:58            self._pool.close()59        except MySQLdb.Error, e :60            LOG_WARNING("%s %s" % (self._db, str(e)))61        except Exception as e:62            LOG_WARNING("%s %s" % (self._db, str(e)))63        finally:64            self._pool = None65    def _create_db(self):66        conn = None67        cursor = None68        try:69            conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)70            cursor = conn.cursor()71            sql = """create database if not exists %s""" %(self._db)72            #LOG_INFO(sql)73            cursor.execute(sql)74            conn.select_db(self._db);75            conn.commit()76        except MySQLdb.Error, e :77            LOG_WARNING("%s execute error %s" % (sql, str(e)))78        finally:79            try:80                if cursor:81                    cursor.close()82                if conn:83                    conn.close()84            except:85                pass86    def select(self, table_name, fields_array, conditions, pre = "", extend = ""):87        fields_str = "," . join(fields_array)88        conds = []89        for (column_name, column_data_info) in conditions.items():90            column_type = self._get_column_type(table_name, column_name)91            column_data = column_data_info[0]92            operation = column_data_info[1]93            if isinstance(column_data, list):94                new_datas = []95                for item in column_data:96                    new_data = self._conv_data(item, column_type)97                    try:98                        new_datas.append(new_data)99                    except:100                        LOG_WARNING("%s %s conv error" %(item, column_type))101                temp_str = "," . join(new_datas)102                cond = column_name + " " + operation  + " (" + temp_str + ")"103                conds.append(cond)104            else:105                new_data = self._conv_data(column_data, column_type)106                try:107                    cond = column_name + " " + operation + " " + new_data108                    conds.append(cond)109                except:110                    LOG_WARNING("%s %s conv error" %(column_data, column_type))111        conds_str = " and " . join(conds)112        sql = "select " + pre + " " + fields_str + " from " + table_name113        if len(conds_str) > 0:114            sql = sql + " where " + conds_str115        116        if len(extend) > 0:117            sql = sql + " " + extend118        119        data_info = self.execute(sql, select = True)120        return data_info121    def _get_tables_info(self):122        tables_info = {}123        tables_sql = "show tables"124        tables_name = self.execute(tables_sql, select = True)125        for table_name_item in tables_name:126            table_name = table_name_item[0]127            if 0 == len(table_name):128                continue129            columns_sql = "show columns from " + table_name 130            table_info = self.execute(columns_sql, select = True)131            table_name = table_name_item[0]132            columns_info = self._get_table_info(table_info)133            if len(columns_info):134                tables_info[table_name] = columns_info135        return tables_info136    def _get_table_info(self, table_info):137        columns_info = {}138        for item in table_info:139            column_name = item[0]140            column_type_info = item[1]141            (type, len) = self._get_column_type_info(column_type_info)142            columns_info[column_name] = {"type":type, "length":len}143        return columns_info144    def _get_column_type_info(self, type_info):145        re_str = '(\w*)\((\d*),?.*\)'146        kw = re.findall(re_str, type_info)147        if len(kw):148            if len(kw[0]) > 1:149                return (kw[0][0], kw[0][1])150        return (None, None)151    def _implode(self, array_data, escape = False):152        array_str = ""153        for item in array_data:154            if 0 != len(array_str):155                array_str += ","156            if escape:157                if type_check.IsNumber(item) or type_check.IsFloat(item):158                    array_str += str(item)159                elif type_check.IsString(item):160                    array_str += "'"161                    array_str += item.encode("string_escape")162                    array_str += "'"163            else:164                array_str += str(item)165        array_str = "(" + array_str + ")"166        return array_str167    def _get_column_type(self, table_name, column_name):168        if 0 == len(self._table_info):169            self.refresh_tables_info()170        if table_name not in self._table_info.keys():171            LOG_WARNING("table_%s info in not exist" %(table_name))172            return "None"173        if column_name not in self._table_info[table_name].keys():174            LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))175            return "None"176        return self._table_info[table_name][column_name]["type"]177    178    def _conv_data(self, data, type):179        if type == "varchar" or type == "char":180            return '"%s"' % (data)181        elif type == "float" or type == "double":182            try:183                conv_data = float(data)184                return "%.8f"  % (conv_data)185            except Exception as e:186                LOG_WARNING("conv %s to %s error" % (data, type))187                return "0"188        elif type == "tinyint" or type == "bigint" or type == "int":189            return "%d" % (int(data))190    191    def _implode_by_tableinfo(self, data_array, table_name, columns_name_array):192        array_str = ""193        if len(data_array) != len(columns_name_array):194            LOG_WARNING("columns name length != data array length! %s %s " % (json.dumps(data_array), json.dumps(columns_name_array)))195            return ""196        for index in range(0, len(columns_name_array)):197            item = data_array[index]198            column_name = columns_name_array[index]199            column_type = self._get_column_type(table_name, column_name)200            if 0 != len(array_str):201                array_str += ","202            203            new_data = self._conv_data(item, column_type)204            try:205                array_str += new_data206            except:207                LOG_WARNING("%s %s conv error" %(item, column_type))208        array_str = "(" + array_str + ")"209        return array_str210 211    def refresh_tables_info(self):212        self._table_info = self._get_tables_info()213    def insert_data(self, table_name, columns_name, data_array):214        if 0 == len(data_array):215            return216        columns = self._implode(columns_name)217        value_list = []218        for item in data_array:219            value_str = self._implode_by_tableinfo(item, table_name, columns_name)220            value_list.append(value_str)221        values_sql = ",".join(value_list)222        if 0 == len(values_sql):223            return224        sql = "insert into " + table_name + columns + " values" + values_sql225        #LOG_INFO(sql)226        self.execute(sql, commit = True)227    228    def update(self, table_name, data, keys_name):229        key_data = {}230        update_data = {}231        for (column_name, column_data) in data.items():232            if column_name in keys_name:233                key_data[column_name] = column_data234            else:235                update_data[column_name] = column_data236        237        update_str_list = []238        try:239            for (column_name, column_data) in update_data.items():240                column_type = self._get_column_type(table_name, column_name)241                new_data = self._conv_data(column_data, column_type)242                update_str = column_name + "=" + new_data243                update_str_list.append(update_str)244            update_info_str = "," . join(update_str_list)245            cond_str_list = []246            for (column_name, column_data) in key_data.items():247                column_type = self._get_column_type(table_name, column_name)248                new_data = self._conv_data(column_data, column_type)249                cond_str = column_name + "=" + new_data250                cond_str_list.append(cond_str)251            cond_info_str = " AND " . join(cond_str_list)252            sql = "UPDATE %s SET %s" % (table_name, update_info_str)253            if 0 != len(cond_info_str):254                sql = sql + " where " + cond_info_str255        256            self.execute(sql, commit = True)257        except Exception as e:258            LOG_WARNING("update %s %s %s error: %s" % (table_name, data, keys_name, str(e)))259    def insert_onduplicate(self, table_name, data, keys_name):260        columns_name = data.keys()261        columns = self._implode(columns_name)262        value_list = []263        data_array = data.values()264        value_str = self._implode_by_tableinfo(data_array, table_name, columns_name)265        sql = "insert into " + table_name + columns + " values" + value_str266        267        update_data = {}268        for (column_name, column_data) in data.items():269            if column_name in keys_name:270                continue271            update_data[column_name] = column_data272        update_str_list = []273        for (column_name, column_data) in update_data.items():274            column_type = self._get_column_type(table_name, column_name)275            new_data = self._conv_data(column_data, column_type)276            update_str = column_name + "=" + new_data277            update_str_list.append(update_str)278        update_info_str = "," . join(update_str_list)279        if len(update_info_str) != 0:280            sql = sql + " ON DUPLICATE KEY UPDATE " + update_info_str281        #LOG_INFO(sql)282        self.execute(sql, commit = True)283    284    def has_table(self, table_name):285        if 0 == len(self._table_info):286            self.refresh_tables_info()287        if table_name in self._table_info.keys():288            return True289        return False290    def execute(self, sql, select = False, commit=False):291        try:292            data = ()293            conn = self._pool.connection()294            cursor = conn.cursor()295            data = cursor.execute(sql)296            if select:297                data = cursor.fetchall()298            if commit:299                conn.commit()300            cursor.close()301        except Exception as e:302            LOG_WARNING("excute sql error %s" % (str(e)))303            LOG_ERROR_SQL("%s" % (sql))304        finally:305            cursor.close()306            conn.close()307        return data308if __name__ == "__main__":309    a = mysql_conn("127.0.0.1", 3306, "root", "fangliang", "stock")310    #a.insert_data("share_base_info", ["share_id", "share_name"], [["000123","2'xxx"], ["000345","'4yyy"]])311    #a.insert_onduplicate("share_base_info", {"share_id":"000123", "share_name":"4YYYYYYYYYYY"}, ["share_id"])...bitmask-helper
Source:bitmask-helper  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3#4# Author: Kali Kaneko5# Copyright (C) 2015-2017 LEAP Encryption Access Project6#7# This program is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program.  If not, see <http://www.gnu.org/licenses/>.19"""20bitmask-helper21==============================================================================22This is a privileged helper script for safely running certain commands as root23under OSX.24It should be run by launchd, and it exposes a Unix Domain Socket to where25the following commmands can be written by the Bitmask application:26  firewall_start [restart] GATEWAY1 GATEWAY2 ...27  firewall_stop28  openvpn_start CONFIG1 CONFIG1 ...29  openvpn_stop30  fw_email_start uid31  fw_email_stop32To load it manually:33  sudo launchctl load /Library/LaunchDaemons/se.leap.bitmask-helper34To see the loaded rules:35  sudo pfctl -s rules -a bitmask36To test the commands, you can write directly to the unix socket. Remember to37terminate the command properly:38 echo 'firewall_stop/CMD' | socat - UNIX-CONNECT:/var/run/bitmask-helper.socket39"""40import os41import socket42import signal43import subprocess44import syslog45import threading46from commands import getoutput as exec_cmd47from functools import partial48import daemon49import re50VERSION = "2"51SCRIPT = "bitmask-helper"52NAMESERVER = "10.42.0.1"53BITMASK_ANCHOR = "com.apple/250.BitmaskFirewall"54BITMASK_ANCHOR_EMAIL = "bitmask_email"55OPENVPN_USER = 'nobody'56OPENVPN_GROUP = 'nogroup'57LEAPOPENVPN = 'LEAPOPENVPN'58APP_PATH = '/Applications/Bitmask.app/'59RESOURCES_PATH = APP_PATH + 'Contents/Resources/'60FIXED_FLAGS = [61    "--setenv", "LEAPOPENVPN", "1",62    "--nobind",63    "--client",64    "--tls-client",65    "--remote-cert-tls", "server",66    "--management-signal",67    "--script-security", "1",68    "--user", "nobody",69    "--remap-usr1", "SIGTERM",70    "--group", OPENVPN_GROUP,71]72ALLOWED_FLAGS = {73    "--remote": ["IP", "NUMBER", "PROTO"],74    "--tls-cipher": ["CIPHER"],75    "--cipher": ["CIPHER"],76    "--auth": ["CIPHER"],77    "--management": ["DIR", "UNIXSOCKET"],78    "--management-client-user": ["USER"],79    "--cert": ["FILE"],80    "--key": ["FILE"],81    "--ca": ["FILE"],82    "--fragment": ["NUMBER"]83}84PARAM_FORMATS = {85    "NUMBER": lambda s: re.match("^\d+$", s),86    "PROTO": lambda s: re.match("^(tcp|udp)$", s),87    "IP": lambda s: is_valid_address(s),88    "CIPHER": lambda s: re.match("^[A-Z0-9-]+$", s),89    "USER": lambda s: re.match(90        "^[a-zA-Z0-9_\.\@][a-zA-Z0-9_\-\.\@]*\$?$", s),  # IEEE Std 1003.1-200191    "FILE": lambda s: os.path.isfile(s),92    "DIR": lambda s: os.path.isdir(os.path.split(s)[0]),93    "UNIXSOCKET": lambda s: s == "unix",94    "UID": lambda s: re.match("^[a-zA-Z0-9]+$", s)95}96#97# paths (must use absolute paths, since this script is run as root)98#99PFCTL = '/sbin/pfctl'100ROUTE = '/sbin/route'101AWK = '/usr/bin/awk'102GREP = '/usr/bin/grep'103CAT = '/bin/cat'104UID = os.getuid()105SERVER_ADDRESS = '/var/run/bitmask-helper.socket'106cmd_lock = threading.Lock()107#108# COMMAND DISPATCH109#110def serve_forever():111    try:112        os.unlink(SERVER_ADDRESS)113    except OSError:114        if os.path.exists(SERVER_ADDRESS):115            raise116    syslog.syslog(syslog.LOG_WARNING, "serving forever")117    # XXX should check permissions on the socket file118    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)119    sock.bind(SERVER_ADDRESS)120    sock.listen(1)121    syslog.syslog(syslog.LOG_WARNING, "Binded to %s" % SERVER_ADDRESS)122    while True:123        connection, client_address = sock.accept()124        thread = threading.Thread(target=handle_command, args=[connection])125        thread.daemon = True126        thread.start()127def recv_until_marker(sock):128    end = '/CMD'129    total_data = []130    data = ''131    while True:132        data = sock.recv(8192)133        if end in data:134            total_data.append(data[:data.find(end)])135            break136        total_data.append(data)137        if len(total_data) > 1:138            # check if end_of_data was split139            last_pair = total_data[-2] + total_data[-1]140            if end in last_pair:141                total_data[-2] = last_pair[:last_pair.find(end)]142                total_data.pop()143                break144    return ''.join(total_data).strip()145def handle_command(sock):146    syslog.syslog(syslog.LOG_WARNING, "handle")147    received = recv_until_marker(sock)148    line = received.replace('\n', '').split(' ')149    command, args = line[0], line[1:]150    syslog.syslog(syslog.LOG_WARNING, 'command %s' % (command))151    cmd_dict = {152        'firewall_start': (firewall_start, args),153        'firewall_stop': (firewall_stop, []),154        'firewall_isup': (firewall_isup, []),155        'openvpn_start': (openvpn_start, args),156        'openvpn_stop': (openvpn_stop, []),157        'openvpn_force_stop': (openvpn_stop, ['KILL']),158        'openvpn_set_watcher': (openvpn_set_watcher, args)159    }160    cmd_call = cmd_dict.get(command, None)161    cmd_lock.acquire()162    try:163        if cmd_call:164            cmd, args = cmd_call165            if args:166                cmd = partial(cmd, *args)167            result = cmd()168            syslog.syslog(syslog.LOG_WARNING, "%s result: %s" % (169                command, str(result)))170            if result == 'YES':171                sock.sendall("%s: YES\n" % command)172            elif result == 'NO':173                sock.sendall("%s: NO\n" % command)174            else:175                sock.sendall("%s: OK\n" % command)176        else:177            syslog.syslog(178                syslog.LOG_WARNING, 'invalid command: %s' % (command,))179            sock.sendall("%s: ERROR\n" % command)180    except Exception as exc:181        syslog.syslog(182            syslog.LOG_WARNING, "error executing function %r" % (exc))183    finally:184        cmd_lock.release()185        sock.close()186#187# OPENVPN188#189openvpn_proc = None190openvpn_watcher_pid = None191def openvpn_start(*args):192    """193    Sanitize input and run openvpn as a subprocess of this long-running daemon.194    Keeps a reference to the subprocess Popen class instance.195    :param args: arguments to be passed to openvpn196    :type args: list197    """198    syslog.syslog(syslog.LOG_WARNING, "OPENVPN START")199    opts = list(args[1:])200    opts += ['--dhcp-option', 'DNS', '10.42.0.1',201             '--up', RESOURCES_PATH + 'client.up.sh',202             '--down', RESOURCES_PATH + 'client.down.sh']203    opts += ["--dev", "tun"]204    binary = [RESOURCES_PATH + 'openvpn.leap']205    if not os.path.isfile(binary[0]):206        binary = ['/usr/local/sbin/openvpn']207    cmd = binary + opts208    syslog.syslog(syslog.LOG_WARNING, 'LAUNCHING VPN: ' + ' '.join(cmd))209    global openvpn_proc210    if openvpn_proc is not None:211        syslog.syslog(syslog.LOG_WARNING, 'vpn: another process is already started')212        return213    # TODO sanitize options214    openvpn_proc = subprocess.Popen(cmd, shell=False, bufsize=-1)215    # XXX DEBUG -------------------------------------------------216    # try:217    #  result = subprocess.check_output(218    #  cmd, shell=False, stderr=subprocess.STDOUT)219    # except Exception as exc:220    #  syslog.syslog(syslog.LOG_WARNING, exc.output)221def openvpn_stop(sig='TERM'):222    """223    Stop the openvpn that has been launched by this privileged helper.224    :param args: arguments to openvpn225    :type args: list226    """227    global openvpn_proc228    syslog.syslog(229        syslog.LOG_WARNING, "Stopping OpenVPN...")230    if openvpn_proc:231        syslog.syslog(232            syslog.LOG_WARNING, "OpenVPN Process: %s" % str(openvpn_proc.pid))233        if sig == 'KILL':234            stop_signal = signal.SIGKILL235            openvpn_proc.kill()236        elif sig == 'TERM':237            stop_signal = signal.SIGTERM238            openvpn_proc.terminate()239        retcode = openvpn_proc.wait()240        syslog.syslog(241            syslog.LOG_WARNING, "OpenVPN died. Return code: %s" % str(retcode))242        syslog.syslog(243            syslog.LOG_WARNING, "openvpn_watcher_pid: %s" % str(244                openvpn_watcher_pid))245        openvpn_proc = None246        if openvpn_watcher_pid:247            try:248                os.kill(openvpn_watcher_pid, stop_signal)249            except Exception:250                pass251def openvpn_set_watcher(pid, *args):252    global openvpn_watcher_pid253    openvpn_watcher_pid = int(pid)254    syslog.syslog(syslog.LOG_WARNING, "Watcher PID: %s" % pid)255#256# FIREWALL257#258def firewall_start(*gateways):259    """260    Bring up the firewall.261    :param gws: list of gateways, to be sanitized.262    :type gws: list263    """264    gateways = get_gateways(gateways)265    if not gateways:266        return False267    _enable_pf()268    _reset_bitmask_gateways_table(gateways)269    default_device = _get_default_device()270    _load_bitmask_anchor(default_device)271def firewall_stop():272    """273    Flush everything from anchor bitmask274    """275    cmd = '{pfctl} -a {anchor} -F all'.format(276        pfctl=PFCTL, anchor=BITMASK_ANCHOR)277    return exec_cmd(cmd)278def firewall_isup():279    """280    Return YES if anchor bitmask is loaded with rules281    """282    cmd = '{pfctl} -s rules -a {anchor} | wc -l'.format(283        pfctl=PFCTL, anchor=BITMASK_ANCHOR)284    output = exec_cmd(cmd)285    rules = output[-1]286    if int(rules) > 0:287        return 'YES'288    else:289        return 'NO'290def _enable_pf():291    exec_cmd('{pfctl} -e'.format(pfctl=PFCTL))292def _reset_bitmask_gateways_table(gateways):293    cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T delete'.format(294        pfctl=PFCTL, anchor=BITMASK_ANCHOR)295    exec_cmd(cmd)296    for gateway in gateways:297        cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T add {gw}'.format(298            pfctl=PFCTL, anchor=BITMASK_ANCHOR, gw=gateway)299        exec_cmd(cmd)300        syslog.syslog(syslog.LOG_WARNING, "adding gw %s" % gateway)301    # cmd = '{pfctl} -a {anchor} -t bitmask_nameservers -T delete'.format(302    #    pfctl=PFCTL, anchor=BITMASK_ANCHOR)303    # output = exec_cmd(cmd)304    cmd = '{pfctl} -a {anchor} -t bitmask_gateways -T add {ns}'.format(305        pfctl=PFCTL, anchor=BITMASK_ANCHOR, ns=NAMESERVER)306    exec_cmd(cmd)307    syslog.syslog(syslog.LOG_WARNING, "adding ns %s" % NAMESERVER)308def _load_bitmask_anchor(default_device):309    cmd = ('{pfctl} -D default_device={defaultdevice} '310           '-a {anchor} -f {rulefile}').format(311        pfctl=PFCTL, defaultdevice=default_device,312        anchor=BITMASK_ANCHOR,313        rulefile=RESOURCES_PATH + 'bitmask-helper/bitmask.pf.conf')314    syslog.syslog(syslog.LOG_WARNING, "LOADING CMD: %s" % cmd)315    return exec_cmd(cmd)316def _get_default_device():317    """318    Retrieve the current default network device.319    :rtype: str320    """321    cmd_def_device = (322        '{route} -n get -net default | '323        '{grep} interface | {awk} "{{print $2}}"').format(324            route=ROUTE, grep=GREP, awk=AWK)325    iface = exec_cmd(cmd_def_device)326    iface = iface.replace("interface: ", "").strip()327    syslog.syslog(syslog.LOG_WARNING, "default device %s" % iface)328    return iface329#330# UTILITY331#332def is_valid_address(value):333    """334    Validate that the passed ip is a valid IP address.335    :param value: the value to be validated336    :type value: str337    :rtype: bool338    """339    try:340        socket.inet_aton(value)341        return True342    except Exception:343        syslog.syslog(syslog.LOG_WARNING, 'MALFORMED IP: %s!' % (value))344        return False345#346# FIREWALL347#348def get_gateways(gateways):349    """350    Filter a passed sequence of gateways, returning only the valid ones.351    :param gateways: a sequence of gateways to filter.352    :type gateways: iterable353    :rtype: iterable354    """355    syslog.syslog(syslog.LOG_WARNING, 'Filtering %s' % str(gateways))356    result = filter(is_valid_address, gateways)357    if not result:358        syslog.syslog(syslog.LOG_ERR, 'No valid gateways specified')359        return False360    else:361        return result362if __name__ == "__main__":363    with daemon.DaemonContext():364        syslog.syslog(syslog.LOG_WARNING, "Serving...")...tails-additional-software
Source:tails-additional-software  
1#!/usr/bin/env python2import gettext3import glob4import os.path5import pwd6import subprocess7import sys8import syslog9_ = gettext.gettext10PERSISTENCE_DIR = "/live/persistence/TailsData_unlocked"11PACKAGES_LIST_FILE = PERSISTENCE_DIR + "/live-additional-software.conf"12ACTIVATION_FILE = "/run/live-additional-software/activated"13APT_ARCHIVES_DIR = "/var/cache/apt/archives"14APT_LISTS_DIR = "/var/lib/apt/lists"15OBSOLETE_APT_LIST_SUFFIX = "_binary-i386_Packages"16def _launch_apt_get(specific_args):17    """Launch apt-get with given args18    Launch apt-get with given arguments list, log its standard and error output19    and return its returncode"""20    apt_get_env = os.environ.copy()21    # The environnment provided in GDM PostLogin hooks doesn't contain /sbin/22    # which is required by dpkg. Let's use the default path for root in Tails.23    apt_get_env['PATH'] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:" \24                          "/usr/bin:/sbin:/bin"25    # We will log the output and want it in English when included in bug26    # reports27    apt_get_env['LANG'] = "C"28    args = ["apt-get", "--quiet", "--yes"]29    args.extend(specific_args)30    apt_get = subprocess.Popen(31        args,32        env=apt_get_env,33        stderr=subprocess.STDOUT,34        stdout=subprocess.PIPE)35    for line in iter(apt_get.stdout.readline, ''):36        if not line.startswith('('):37            syslog.syslog(line.rstrip())38    apt_get.wait()39    if apt_get.returncode:40        syslog.syslog(syslog.LOG_WARNING,41                      "apt-get exited with returncode %i" % apt_get.returncode)42    return apt_get.returncode43def _notify(title, body):44    """Display a notification to the user of the live system45    """46    cmd = "/usr/local/sbin/tails-notify-user"47    try:48        notify_user_output = subprocess.check_output([cmd, title, body],49                                                     stderr=subprocess.STDOUT)50    except subprocess.CalledProcessError as e:51        syslog.syslog(syslog.LOG_WARNING,52                      "Warning: unable to notify the user. %s returned "53                      "with exit code %s" % (cmd, e.returncode))54        syslog.syslog(syslog.LOG_WARNING,55                      "%s output follows: %s." % (cmd, notify_user_output))56        syslog.syslog(syslog.LOG_WARNING,57                      "The notification was: %s %s" % (title, body))58    except OSError as e:59        syslog.syslog(syslog.LOG_WARNING,60                      "Warning: unable to notify the user. %s" % e)61        syslog.syslog(syslog.LOG_WARNING,62                      "The notification was: %s %s" % (title, body))63def has_additional_packages_list():64    """Return true iff PACKAGES_LIST_FILE exists65    """66    return os.path.isfile(PACKAGES_LIST_FILE)67def get_additional_packages():68    """Returns the list of all the additional packages69    """70    packages = []71    if has_additional_packages_list():72        with open(PACKAGES_LIST_FILE) as f:73            for line in f:74                line = line.strip()75                if line:76                    packages.append(line)77        f.closed78    return packages79def install_additional_packages():80    """The subcommand which activates and installs all additional packages81    """82    syslog.syslog("Starting to install additional software...")83    if has_additional_packages_list():84        syslog.syslog("Found additional packages list")85    elif os.path.isdir(PERSISTENCE_DIR):86        syslog.syslog(syslog.LOG_WARNING,87                      "Warning: no configuration file found, creating an "88                      "empty one.")89        create_additional_packages_list()90        return True91    else:92        syslog.syslog(syslog.LOG_WARNING,93                      "Warning: persistence is not mounted, exiting")94        return True95    try:96        clear_obsolete_cache(OBSOLETE_APT_LIST_SUFFIX)97    except:98        syslog.syslog(syslog.LOG_WARNING,99                      "Warning: failed to clear obsolete cached packages")100    packages = get_additional_packages()101    if not packages:102        syslog.syslog(syslog.LOG_WARNING,103                      "Warning: no packages to install, exiting")104        return True105    set_activated()106    syslog.syslog("Will install the following packages: %s"107                  % " ".join(packages))108    apt_get_returncode = _launch_apt_get(109        ["--no-remove",110         "--option", "DPkg::Options::=--force-confold",111         "install"] + packages)112    if apt_get_returncode:113        syslog.syslog(syslog.LOG_WARNING,114                      "Warning: installation of %s failed"115                      % " ".join(packages))116        return False117    else:118        syslog.syslog("Installation completed successfully.")119        return True120def upgrade_additional_packages():121    """The subcommand which upgrades all additional packages if they are activated122    """123    if not is_activated():124        syslog.syslog(syslog.LOG_WARNING,125                      "Warning: additional packages not activated, exiting")126        return True127    syslog.syslog("Starting to upgrade additional software...")128    apt_get_returncode = _launch_apt_get(["update"])129    if apt_get_returncode:130        syslog.syslog(syslog.LOG_WARNING, "Warning: the update failed.")131        _notify(_("Your additional software"),132                _("The upgrade failed. This might be due to a network "133                  "problem. Please check your network connection, try to "134                  "restart Tails, or read the system log to understand better "135                  "the problem."))136        return False137    # Remove outdated packages from the local package cache. This is needed as138    # we disable apt-daily.timer, which would else take care of this cleanup.139    # Note: this does not remove packages from other architectures, hence140    # the need for the clear_obsolete_cache() function.141    apt_get_returncode = _launch_apt_get(["autoclean"])142    if apt_get_returncode:143        syslog.syslog(syslog.LOG_WARNING,144                      "Warning: autoclean failed, proceeding anyway.")145    if install_additional_packages():146        _notify(_("Your additional software"),147                _("The upgrade was successful."))148        return True149    else:150        _notify(_("Your additional software"),151                _("The upgrade failed. This might be due to a network "152                  "problem. Please check your network connection, try to "153                  "restart Tails, or read the system log to understand better "154                  "the problem."))155        return False156def create_additional_packages_list():157    """Creates the additional packages list158    Creates the additional packages list file with the right permissions.159    The caller must ensure the file doesn't already exist.160    """161    assert not has_additional_packages_list(), "%s already exists" \162        % PACKAGES_LIST_FILE163    syslog.syslog("Creating additional software configuration file")164    f = open(PACKAGES_LIST_FILE, 'w')165    f.closed166    os.chmod(PACKAGES_LIST_FILE, 0o600)167    os.chown(PACKAGES_LIST_FILE,168             pwd.getpwnam('tails-persistence-setup').pw_uid,169             pwd.getpwnam('tails-persistence-setup').pw_gid)170def clear_obsolete_cache(obsolete_apt_list_suffix):171    """Delete cached packages when a package list's filename172    ends with obsolete_apt_list_suffix173    """174    if glob.glob(APT_LISTS_DIR + "/*" + obsolete_apt_list_suffix):175        syslog.syslog("Clearing cache because at least one package list "176                      "has suffix '%s'." % obsolete_apt_list_suffix)177        for deb in glob.glob(APT_ARCHIVES_DIR + "/*.deb"):178            os.remove(deb)179def is_activated():180    """Check if additional software has been activated181    """182    return os.path.isfile(ACTIVATION_FILE)183def set_activated():184    """Save that additional software has been activated185    """186    syslog.syslog("Activating persistent software packages")187    activation_file_dir = os.path.dirname(ACTIVATION_FILE)188    if not os.path.exists(activation_file_dir):189        os.makedirs(activation_file_dir)190    try:191        f = open(ACTIVATION_FILE, 'w')192    finally:193        if f:194            f.close()195def print_help():196    """The subcommand which displays help197    """198    sys.stderr.write("Usage: %s <subcommand>\n" % program_name)199    sys.stderr.write("""Subcommands:200    install: activate and install additional software201    upgrade: upgrade additional software if activated\n""")202if __name__ == "__main__":203    program_name = os.path.basename(sys.argv[0])204    syslog.openlog("%s[%i]" % (program_name, os.getpid()))205    gettext.install("tails")206    if len(sys.argv) < 2:207        print_help()208        sys.exit(4)209    if sys.argv[1] == "install":210        if not install_additional_packages():211            sys.exit(1)212    elif sys.argv[1] == "upgrade":213        if not upgrade_additional_packages():214            sys.exit(2)215    else:216        print_help()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
