Best Python code snippet using lisa_python
test_core_api.py
Source:test_core_api.py  
1import mock2import unittest3import types4import balancer.core.api as api5from openstack.common import exception6from balancer import exception as exc7import balancer.db.models as models8class TestDecorators(unittest.TestCase):9    def setUp(self):10        self.func = mock.MagicMock(__name__='Test func',11                return_value=mock.MagicMock(spec=types.FunctionType))12    def test_asynchronous_1(self):13        wrapped = api.asynchronous(self.func)14        wrapped(async=False)15        self.assertEquals(self.func.call_args_list, [mock.call()])16    @mock.patch("eventlet.spawn")17    def test_asynchronous_2(self, mock_event):18        wrapped = api.asynchronous(self.func)19        wrapped(async=True)20        self.assertTrue(mock_event.called)21class TestBalancer(unittest.TestCase):22    patch_balancer = mock.patch("balancer.loadbalancers.vserver.Balancer")23    patch_schedule = mock.patch("balancer.core.scheduler.schedule")24    patch_reschedule = mock.patch("balancer.core.scheduler.reschedule")25    patch_logger = mock.patch("logging.getLogger")26    def setUp(self):27        self.conf = mock.MagicMock()28        value = mock.MagicMock29        self.dict_list = [{'id': 1, 'name': 'name', 'extra': {30            'stragearg': value, 'anotherarg': value}, },31            {'id': 2, 'name': 'name0', 'extra': {32                'stragearg': value, 'anotherarg': value}, }]33        self.dictionary = {'id': 1, 'name': 'name', 'extra': {34            'stragearg': value, 'anotherarg': value}, }35        self.tenant_id = 136        self.lb_id = 137        self.lb_node = self.dictionary38        self.lb_nodes = self.dict_list39        self.lb_node_id = 140        self.lb_body_0 = {'bubble': "bubble"}41        self.lb_body = {'algorithm': "bubble"}42        self.dict_list_0 = {'nodes': [{'id': 1, 'name': 'name',43            'extra': {'stragearg': value, 'anotherarg': value}}],44            'healthMonitor': [{'id': 2, 'name': 'name0', 'extra': {45                'stragearg': value, 'anotherarg': value}}],46            'virtualIps': [{'id': 333, 'name': 'name0', 'extra': {47                'stragearg': value, 'anotherarg': value}}]}48    @mock.patch("balancer.db.api.unpack_extra")49    @mock.patch("balancer.db.api.loadbalancer_get_all_by_project")50    def test_lb_get_index(self, mock_api, mock_extra):51        mock_api.return_value = [{'id': 1, 'virtualIps': 'foo'}, {'id': 2}]52        mock_extra.return_value = 'foo'53        resp = api.lb_get_index(self.conf, self.tenant_id)54        self.assertTrue(mock_api.called)55        self.assertTrue(mock_extra.called)56        self.assertTrue(mock_extra.call_count == 2)57        self.assertEqual(resp, ['foo', 'foo'])58        mock_api.assert_called_once_with(self.conf, self.tenant_id)59        mock_extra.assert_any_call({'id': 1, 'virtualIps': 'foo'})60        mock_extra.assert_any_call({'id': 2})61    @mock.patch("balancer.db.api.unpack_extra")62    @mock.patch("balancer.db.api.loadbalancer_get_all_by_vm_id")63    def test_lb_find_for_vm(self, mock_api, mock_extra):64        vm_id = mock.Mock()65        mock_api.return_value = ['foo']66        mock_extra.return_value = 'foo'67        resp = api.lb_find_for_vm(self.conf, 'fake_tenant', vm_id)68        mock_api.assert_called_once_with(self.conf, 'fake_tenant', vm_id)69        mock_extra.assert_called_once_with('foo')70        self.assertTrue(mock_api.called)71        self.assertEqual(resp, ['foo'])72    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")73    @mock.patch("balancer.db.api.unpack_extra")74    @mock.patch("balancer.db.api.loadbalancer_get")75    @mock.patch("balancer.db.api.virtualserver_get_all_by_sf_id")76    @mock.patch("balancer.db.api.server_get_all_by_sf_id")77    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")78    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")79    def test_lb_show_details(self, *mocks):80        mocks[5].return_value = {"virtualIps": 1, "nodes": 2,81                "healthMonitor": 3, "sessionPersistence": 4}82        mocks[6].return_value = mock.MagicMock(spec=models.ServerFarm)83        api.lb_show_details(self.conf, 'fake_tenant', self.lb_id)84        for mok in mocks:85            self.assertTrue(mok.called, "This mock %s didn't call"86                    % mok._mock_name)87    @mock.patch("balancer.db.api.loadbalancer_get")88    @mock.patch("balancer.db.api.unpack_extra")89    def test_lb_get_data_0(self, mock_api, mock_bal):90        api.lb_get_data(self.conf, 'fake_tenant', self.lb_id)91        self.assertTrue(mock_api.called)92    @mock.patch("balancer.db.api.loadbalancer_get")93    @mock.patch("balancer.db.api.unpack_extra")94    def test_lb_get_data_1(self, mock_api, mock_bal):95        mock_api.return_value = {"id": 1, "virtualIps": "cranch"}96        res = api.lb_get_data(self.conf, 'fake_tenant', self.lb_id)97        self.assertTrue(mock_api.called)98        self.assertEquals(res, {"id": 1})99    @mock.patch("balancer.db.api.probe_create")100    @mock.patch("balancer.db.api.probe_pack_extra")101    @mock.patch("balancer.db.api.server_create")102    @mock.patch("balancer.db.api.server_pack_extra")103    @mock.patch("balancer.db.api.virtualserver_create")104    @mock.patch("balancer.db.api.virtualserver_pack_extra")105    @mock.patch("balancer.db.api.serverfarm_create")106    @mock.patch("balancer.db.api.predictor_create")107    @mock.patch("balancer.db.api.loadbalancer_update")108    @mock.patch("balancer.db.api.loadbalancer_create")109    @mock.patch("balancer.db.api.loadbalancer_pack_extra")110    @patch_schedule111    @mock.patch("balancer.core.commands.create_loadbalancer")112    @mock.patch("balancer.drivers.get_device_driver")113    def test_create_lb_0(self, *mocks):114        """No exception"""115        mocks[2].return_value = {'id': 1}116        mocks[4].return_value = {'id': 2, 'algorithm': 'test',117                                 'protocol': 'test'}118        api.create_lb(self.conf, self.dict_list_0)119        for mok in mocks:120            self.assertTrue(mok.called, "Mock %s didn't call"121                    % mok._mock_name)122        mocks[5].assert_called_with(self.conf, 2,123                                    {'status': 'ACTIVE', 'deployed': True})124    @mock.patch("balancer.db.api.probe_create")125    @mock.patch("balancer.db.api.probe_pack_extra")126    @mock.patch("balancer.db.api.server_create")127    @mock.patch("balancer.db.api.server_pack_extra")128    @mock.patch("balancer.db.api.virtualserver_create")129    @mock.patch("balancer.db.api.virtualserver_pack_extra")130    @mock.patch("balancer.db.api.loadbalancer_update")131    @mock.patch("balancer.db.api.loadbalancer_create")132    @mock.patch("balancer.db.api.loadbalancer_pack_extra")133    @mock.patch("balancer.db.api.serverfarm_create")134    @mock.patch("balancer.db.api.predictor_create")135    @patch_schedule136    @mock.patch("balancer.core.commands.create_loadbalancer")137    @mock.patch("balancer.drivers.get_device_driver")138    def test_create_lb_1(self, *mocks):139        """Exception"""140        mocks[6].return_value = {'id': 2, 'algorithm': 'test',141                                 'protocol': 'test'}142        mocks[1].side_effect = exception.Invalid143        mocks[2].return_value = {'id': 1}144        mocks[4].return_value = mock.MagicMock()145        self.assertRaises(exception.Invalid, api.create_lb,146                          self.conf, self.dict_list_0)147        mocks[7].assert_called_with(self.conf, 2,148                                    {'status': 'ERROR', 'deployed': False})149    @patch_reschedule150    @mock.patch("balancer.core.commands.delete_loadbalancer")151    @mock.patch("balancer.core.commands.create_loadbalancer")152    @mock.patch("balancer.db.api.loadbalancer_get")153    @mock.patch("balancer.db.api.loadbalancer_update")154    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")155    @mock.patch("balancer.db.api.predictor_get_by_sf_id")156    @mock.patch("balancer.db.api.predictor_update")157    @mock.patch("balancer.db.api.virtualserver_get_all_by_sf_id")158    @mock.patch("balancer.db.api.virtualserver_update")159    @mock.patch("balancer.db.api.server_get_all_by_sf_id")160    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")161    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")162    @mock.patch("balancer.drivers.get_device_driver")163    def test_update_lb(self,164                       mock_get_device_driver,165                       mock_sticky_get_all_by_sf_id,166                       mock_probe_get_all_by_sf_id,167                       mock_server_get_all_by_sf_id,168                       mock_virtualserver_update,169                       mock_virtualserver_get_all_by_sf_id,170                       mock_predictor_update,171                       mock_predictor_get_by_sf_id,172                       mock_serverfarm_get_all_by_lb_id,173                       mock_loadbalancer_update,174                       mock_loadbalancer_get,175                       mock_create_loadbalancer,176                       mock_delete_loadbalancer,177                       mock_reschedule):178        lb_body = {'algorithm': 'FAKE_ALGO1', 'protocol': 'FAKE_PROTO1'}179        mock_loadbalancer_get.return_value = {180            'id': self.lb_id,181            'device_id': 'fakedeviceid',182            'name': 'fakename',183            'algorithm': 'FAKE_ALGO0',184            'protocol': 'FAKE_PROTO0',185        }186        mock_loadbalancer_update.return_value = lb_ref = {187            'id': self.lb_id,188            'device_id': 'fakedeviceid',189            'name': 'fakename',190            'algorithm': 'FAKE_ALGO1',191            'protocol': 'FAKE_PROTO1',192        }193        mock_reschedule.return_value = {'id': 'fakedeviceid'}194        sf_ref = {'id': 'fakesfid'}195        mock_serverfarm_get_all_by_lb_id.return_value = [sf_ref]196        predictor_ref = {'id': 'fakepredid'}197        mock_predictor_get_by_sf_id.return_value = predictor_ref198        vip_ref = {'id': 'fakevipid', 'extra': {'protocol': 'FAKE_PROTO0'}}199        mock_virtualserver_get_all_by_sf_id.return_value = [vip_ref]200        mock_servers = mock_server_get_all_by_sf_id.return_value201        mock_probes = mock_probe_get_all_by_sf_id.return_value202        mock_stickies = mock_sticky_get_all_by_sf_id.return_value203        mock_device_driver = mock_get_device_driver.return_value204        api.update_lb(self.conf, 'faketenantid', self.lb_id, lb_body,205                      async=False)206        mock_loadbalancer_get.assert_called_once_with(self.conf, self.lb_id,207                                                      tenant_id='faketenantid')208        mock_serverfarm_get_all_by_lb_id.assert_called_once_with(self.conf,209                                                                 self.lb_id)210        mock_predictor_get_by_sf_id.assert_called_once_with(self.conf,211                                                            sf_ref['id'])212        mock_predictor_update.assert_called_once_with(self.conf,213            predictor_ref['id'], {'type': 'FAKE_ALGO1'})214        mock_virtualserver_get_all_by_sf_id.assert_called_once_with(self.conf,215                                                            sf_ref['id'])216        mock_virtualserver_update.assert_called_once_with(self.conf,217            vip_ref['id'], {'id': 'fakevipid',218                            'extra': {'protocol': 'FAKE_PROTO1'}})219        for mock_func in [mock_server_get_all_by_sf_id,220                          mock_probe_get_all_by_sf_id,221                          mock_sticky_get_all_by_sf_id]:222            mock_func.assert_called_once_with(self.conf, sf_ref['id'])223        mock_get_device_driver.assert_called_once_with(self.conf,224                                                       lb_ref['device_id'])225        mock_loadbalancer_update.assert_has_calls([226            mock.call(self.conf, self.lb_id, lb_ref),227            mock.call(self.conf, self.lb_id, {'status': 'ACTIVE'}),228        ])229        # reschedule returns another device230        mock_reschedule.return_value = {'id': 'anotherdeviceid'}231        mock_loadbalancer_update.reset_mock()232        mock_loadbalancer_get.return_value['algorithm'] = 'FAKE_ALGO0'233        api.update_lb(self.conf, 'faketenantid', self.lb_id, lb_body,234                      async=False)235        mock_loadbalancer_update.assert_has_calls([236            mock.call(self.conf, self.lb_id, lb_ref),237            mock.call(self.conf, self.lb_id, {'device_id': 'anotherdeviceid'}),238            mock.call(self.conf, self.lb_id, {'status': 'ACTIVE'}),239        ])240    @mock.patch("balancer.drivers.get_device_driver")241    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")242    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")243    @mock.patch("balancer.db.api.server_get_all_by_sf_id")244    @mock.patch("balancer.db.api.virtualserver_get_all_by_sf_id")245    @mock.patch("balancer.db.api.predictor_update")246    @mock.patch("balancer.db.api.predictor_get_by_sf_id")247    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")248    @mock.patch("balancer.db.api.loadbalancer_update")249    @mock.patch("balancer.db.api.loadbalancer_get")250    def test_update_lb_nothing(self,251                               mock_loadbalancer_get,252                               mock_loadbalancer_update,253                               *mock_funcs):254        lb_body = {'name': 'fakenewname'}255        mock_loadbalancer_get.return_value = {256            'id': self.lb_id,257            'device_id': 'fakedeviceid',258            'name': 'fakename',259            'algorithm': 'FAKE_ALGO0',260            'protocol': 'FAKE_PROTO0',261        }262        mock_loadbalancer_update.return_value = lb_ref = {263            'id': self.lb_id,264            'device_id': 'fakedeviceid',265            'name': 'fakenewname',266            'algorithm': 'FAKE_ALGO0',267            'protocol': 'FAKE_PROTO0',268        }269        sf_ref = {'id': 'fakesfid'}270        api.update_lb(self.conf, 'faketenantid', self.lb_id, lb_body,271                      async=False)272        mock_loadbalancer_get.assert_called_once_with(self.conf, self.lb_id,273                                                      tenant_id='faketenantid')274        for mock_func in mock_funcs:275            mock_func.assert_has_calls([])276    @patch_reschedule277    @mock.patch("balancer.core.commands.delete_loadbalancer")278    @mock.patch("balancer.core.commands.create_loadbalancer")279    @mock.patch("balancer.db.api.loadbalancer_get")280    @mock.patch("balancer.db.api.loadbalancer_update")281    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")282    @mock.patch("balancer.db.api.predictor_get_by_sf_id")283    @mock.patch("balancer.db.api.predictor_update")284    @mock.patch("balancer.db.api.virtualserver_get_all_by_sf_id")285    @mock.patch("balancer.db.api.server_get_all_by_sf_id")286    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")287    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")288    @mock.patch("balancer.drivers.get_device_driver")289    def test_update_lb_error(self,290                             mock_get_device_driver,291                             mock_sticky_get_all_by_sf_id,292                             mock_probe_get_all_by_sf_id,293                             mock_server_get_all_by_sf_id,294                             mock_virtualserver_get_all_by_sf_id,295                             mock_predictor_update,296                             mock_predictor_get_by_sf_id,297                             mock_serverfarm_get_all_by_lb_id,298                             mock_loadbalancer_update,299                             mock_loadbalancer_get,300                             mock_create_loadbalancer,301                             mock_delete_loadbalancer,302                             mock_reschedule):303        lb_body = {'algorithm': 'FAKE_ALGO1'}304        mock_loadbalancer_get.return_value = {305            'id': self.lb_id,306            'device_id': 'fakedeviceid',307            'name': 'fakename',308            'algorithm': 'FAKE_ALGO0',309            'protocol': 'FAKE_PROTO0',310        }311        mock_loadbalancer_update.return_value = lb_ref = {312            'id': self.lb_id,313            'device_id': 'fakedeviceid',314            'name': 'fakename',315            'algorithm': 'FAKE_ALGO1',316            'protocol': 'FAKE_PROTO0',317        }318        mock_reschedule.return_value = {'id': 'fakedeviceid'}319        sf_ref = {'id': 'fakesfid'}320        mock_serverfarm_get_all_by_lb_id.return_value = [sf_ref]321        predictor_ref = {'id': 'fakepredid'}322        mock_predictor_get_by_sf_id.return_value = predictor_ref323        # assume core.commands.delete_loadbalancer raises error324        mock_delete_loadbalancer.side_effect = exception.Invalid325        self.assertRaises(exception.Invalid, api.update_lb, self.conf,326                          'faketenantid', self.lb_id, lb_body, async=False)327        mock_loadbalancer_update.assert_has_calls([328            mock.call(self.conf, self.lb_id, lb_ref),329            mock.call(self.conf, self.lb_id, {'status': 'ERROR'}), ])330        # assume core.commands.create_loadbalancer raises error331        mock_delete_loadbalancer.side_effect = None332        mock_loadbalancer_update.reset_mock()333        mock_loadbalancer_get.return_value['algorithm'] = 'FAKE_ALGO0'334        mock_create_loadbalancer.side_effect = exception.Invalid335        self.assertRaises(exception.Invalid, api.update_lb, self.conf,336                          'faketenantid', self.lb_id, lb_body, async=False)337        mock_loadbalancer_update.assert_has_calls([338            mock.call(self.conf, self.lb_id, lb_ref),339            mock.call(self.conf, self.lb_id, {'status': 'ERROR'}), ])340    @mock.patch("balancer.db.api.virtualserver_destroy_by_sf_id")341    @mock.patch("balancer.db.api.predictor_destroy_by_sf_id")342    @mock.patch("balancer.db.api.serverfarm_destroy")343    @mock.patch("balancer.db.api.loadbalancer_destroy")344    @mock.patch("balancer.db.api.server_destroy_by_sf_id")345    @mock.patch("balancer.db.api.sticky_destroy_by_sf_id")346    @mock.patch("balancer.db.api.probe_destroy_by_sf_id")347    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")348    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")349    @mock.patch("balancer.db.api.server_get_all_by_sf_id")350    @mock.patch("balancer.db.api.virtualserver_get_all_by_sf_id")351    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")352    @mock.patch("balancer.db.api.loadbalancer_get")353    @mock.patch("balancer.drivers.get_device_driver")354    @mock.patch("balancer.core.commands.delete_loadbalancer")355    def test_delete_lb(self, *mocks):356        mocks[2].return_value = mock.MagicMock()357        api.delete_lb(self.conf, 'fake_tenant', self.lb_id)358        for m in mocks:359            self.assertTrue(m.called, "Mock %s wasn't called"360                    % m._mock_name)361    @mock.patch("balancer.db.api.unpack_extra")362    @mock.patch("balancer.db.api.server_create")363    @mock.patch("balancer.db.api.server_pack_extra")364    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")365    @mock.patch("balancer.db.api.loadbalancer_get")366    @mock.patch("balancer.drivers.get_device_driver")367    @mock.patch("balancer.core.commands.add_node_to_loadbalancer")368    def test_lb_add_nodes(self, *mocks):369        mocks[2].return_value = {'device_id': 1}370        mocks[3].return_value = [{'id': 1}]371        api.lb_add_nodes(self.conf, 'fake_tenant', self.lb_id, self.lb_nodes)372        for mok in mocks:373            self.assertTrue(mok.called, "This mock didn't call %s"374                    % mok._mock_name)375    @mock.patch("balancer.db.api.unpack_extra")376    @mock.patch("balancer.db.api.server_get_all_by_sf_id")377    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")378    def test_lb_show_nodes(self, mock_serverfarm, mock_server, mock_unpack):379        mock_serverfarm.return_value = self.dict_list380        api.lb_show_nodes(self.conf, 'fake_tenant', 1)381        self.assertTrue(mock_serverfarm.called)382        self.assertTrue(mock_server.called)383    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")384    @mock.patch("balancer.db.api.loadbalancer_get")385    @mock.patch("balancer.drivers.get_device_driver")386    @mock.patch("balancer.core.commands.remove_node_from_loadbalancer")387    @mock.patch("balancer.db.api.server_get")388    @mock.patch("balancer.db.api.server_destroy")389    def test_lb_delete_node(self, *mocks):390        mocks[5].return_value = self.dict_list391        api.lb_delete_node(self.conf,392                'fake_tenant', self.lb_id, self.lb_node_id)393        for mock in mocks:394            self.assertTrue(mock.called)395    @mock.patch("balancer.db.api.serverfarm_get")396    @mock.patch("balancer.db.api.loadbalancer_get")397    @mock.patch("balancer.db.api.server_update")398    @mock.patch("balancer.drivers.get_device_driver")399    @mock.patch("balancer.core.commands.activate_rserver")400    @mock.patch("balancer.db.api.server_get")401    def test_lb_change_node_status_0(self, *mocks):402        """Activate server called"""403        lb_node_status = "inservice"404        api.lb_change_node_status(self.conf,405                'fake_tenant', self.lb_id, self.lb_node_id, lb_node_status)406        for mock in mocks:407            self.assertTrue(mock.called)408    @mock.patch("balancer.db.api.serverfarm_get")409    @mock.patch("balancer.db.api.loadbalancer_get")410    @mock.patch("balancer.db.api.server_update")411    @mock.patch("balancer.drivers.get_device_driver")412    @mock.patch("balancer.core.commands.suspend_rserver")413    @mock.patch("balancer.db.api.server_get")414    def test_lb_change_node_status_1(self, *mocks):415        """Suspend server called"""416        lb_node_status = ""417        api.lb_change_node_status(self.conf,418                'fake_tenant', self.lb_id, self.lb_node_id, lb_node_status)419        for mock in mocks:420            self.assertTrue(mock.called)421    @mock.patch("balancer.db.api.serverfarm_get")422    @mock.patch("balancer.db.api.loadbalancer_get")423    @mock.patch("balancer.db.api.server_update")424    @mock.patch("balancer.drivers.get_device_driver")425    @mock.patch("balancer.core.commands.activate_rserver")426    @mock.patch("balancer.core.commands.suspend_rserver")427    @mock.patch("balancer.db.api.server_get")428    def test_lb_change_node_status_2(self, *mocks):429        """return ok"""430        mocks[0].return_value = {'sf_id': 1, 'state': 'status'}431        api.lb_change_node_status(self.conf,432                'fake_tenant', self.lb_id, self.lb_node_id, 'status')433        self.assertFalse(mocks[1].called)434        self.assertFalse(mocks[2].called)435    @mock.patch("balancer.drivers.get_device_driver")436    @mock.patch("balancer.db.api.server_create")437    @mock.patch("balancer.db.api.server_update")438    @mock.patch("balancer.db.api.server_get")439    @mock.patch("balancer.db.api.server_destroy")440    @mock.patch("balancer.db.api.loadbalancer_get")441    @mock.patch("balancer.db.api.serverfarm_get")442    @mock.patch("balancer.core.commands.delete_rserver_from_server_farm")443    @mock.patch("balancer.core.commands.add_rserver_to_server_farm")444    def test_lb_update_node_0(self, mock_com0, mock_com1, *mocks):445        """"""446        api.lb_update_node(self.conf,447                'fake_tenant', self.lb_id, self.lb_node_id, self.lb_node)448        self.assertTrue(mock_com0.called)449        self.assertTrue(mock_com1.called)450    @mock.patch("balancer.drivers.get_device_driver")451    @mock.patch("balancer.db.api.server_update")452    @mock.patch("balancer.db.api.server_get")453    @mock.patch("balancer.db.api.loadbalancer_get")454    @mock.patch("balancer.db.api.serverfarm_get")455    @mock.patch("balancer.core.commands.delete_rserver_from_server_farm")456    @mock.patch("balancer.core.commands.add_rserver_to_server_farm")457    @mock.patch("balancer.db.api.unpack_extra")458    def test_lb_update_node(self, mock_extra, mock_command1, mock_command2,459                            mock_sf, mock_lb, mock_get, mock_update,460                            mock_driver):461        """"""462        mock_extra.return_value = self.dictionary463        mock_lb.return_value.__getitem__.return_value = 2464        resp = api.lb_update_node(self.conf,465                'fake_tenant', self.lb_id, self.lb_node_id, self.lb_node)466        self.assertEqual(resp, self.dictionary)467        mock_update.assert_called_once_with(self.conf,468                                            mock_get.return_value['id'],469                                            mock_get.return_value)470        mock_extra.assert_called_once_with(mock_update.return_value)471        mock_get.assert_called_once_with(self.conf,472                self.lb_node_id, tenant_id='fake_tenant')473        mock_sf.assert_called_once_with(self.conf,474                                        mock_get.return_value['sf_id'])475        mock_lb.assert_called_once_with(self.conf, self.lb_id)476        mock_driver.assert_called_once_with(self.conf, 2)477        with mock_driver.return_value.request_context() as ctx:478            mock_command1.assert_called_once_with(ctx, mock_sf.return_value,479                                                  mock_update.return_value)480            mock_command2.assert_called_once_with(ctx, mock_sf.return_value,481                                                  mock_get.return_value)482    @mock.patch("balancer.db.api.unpack_extra")483    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")484    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")485    def test_lb_show_probes_0(self, db_api0, db_api1, db_api2):486        db_api0.return_value = self.dict_list487        db_api1.return_value.__getitem__.return_value.\488                             __getitem__.return_value = 2489        db_api2.return_value = {'probe': 'foo'}490        resp = api.lb_show_probes(self.conf, 'fake_tenant', self.lb_id)491        self.assertTrue(db_api1.called)492        self.assertTrue(db_api2.called)493        self.assertTrue(db_api2.call_count == 2)494        self.assertEqual(resp, {'healthMonitoring': [{'probe': 'foo'},495                                                     {'probe': 'foo'}]})496        db_api0.assert_called_once_with(self.conf, 2)497        db_api1.assert_called_once_with(self.conf,498                self.lb_id, tenant_id='fake_tenant')499        db_api2.assert_any_call(self.dict_list[0])500        db_api2.assert_any_call(self.dict_list[1])501    @mock.patch("balancer.db.api.unpack_extra")502    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")503    @mock.patch("balancer.db.api.probe_get_all_by_sf_id")504    def test_lb_show_probes_1(self, db_api0, db_api1, db_api2):505        db_api1.return_value = []506        with self.assertRaises(exc.ServerFarmNotFound):507            api.lb_show_probes(self.conf, 'fake_tenant', self.lb_id)508            self.assertFalse(db_api0.called)509            self.assertFalse(db_api2.called)510    @mock.patch("balancer.db.api.loadbalancer_get")511    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")512    @mock.patch("balancer.db.api.probe_pack_extra")513    @mock.patch("balancer.db.api.probe_create")514    @mock.patch("balancer.drivers.get_device_driver")515    @mock.patch("balancer.core.commands.add_probe_to_loadbalancer")516    @mock.patch("balancer.db.api.unpack_extra")517    def test_lb_add_probe_0(self, mock_unpack, mock_command, mock_driver,518                            mock_create, mock_pack, mock_sf, mock_lb):519        """lb_probe['type']!=None"""520        lb_probe = {'type': 'Gvido'}521        mock_sf.return_value.__getitem__.return_value = {'id': 'foo'}522        mock_unpack.return_value = self.dictionary523        resp = api.lb_add_probe(self.conf, 'fake_tenant', self.lb_id, lb_probe)524        self.assertEqual(resp, self.dictionary)525        mock_unpack.assert_called_once_with(mock_create.return_value)526        mock_pack.assert_called_once_with(lb_probe)527        mock_create.assert_called_once_with(self.conf, mock_pack.return_value)528        mock_sf.assert_called_once_with(self.conf, mock_lb.return_value['id'])529        mock_lb.assert_called_once_with(self.conf,530                self.lb_id, tenant_id='fake_tenant')531        with mock_driver.return_value.request_context() as ctx:532            mock_command.assert_called_once_with(ctx, {'id': 'foo'},533                                                 mock_create.return_value)534    @mock.patch("balancer.db.api.loadbalancer_get")535    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")536    @mock.patch("balancer.db.api.probe_pack_extra")537    @mock.patch("balancer.db.api.probe_create")538    @mock.patch("balancer.drivers.get_device_driver")539    @mock.patch("balancer.core.commands.add_probe_to_loadbalancer")540    @mock.patch("balancer.db.api.unpack_extra")541    def test_lb_add_probe_1(self, *mocks):542        """lb_probe['type']=None"""543        lb_probe = {'type': None}544        resp = api.lb_add_probe(self.conf, 'fake_tenant', self.lb_id, lb_probe)545        self.assertEqual(resp, None)546        for mock in mocks:547            self.assertFalse(mock.called)548    @mock.patch("balancer.db.api.loadbalancer_get")549    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")550    def test_lb_add_probe_2(self, mock_sf, mock_lb):551        """Exception"""552        lb_probe = {'type': 'Gvido'}553        mock_sf.return_value = []554        with self.assertRaises(exc.ServerFarmNotFound):555            api.lb_add_probe(self.conf, 'fake_tenant', self.lb_id, lb_probe)556    @mock.patch("balancer.db.api.loadbalancer_get")557    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")558    @mock.patch("balancer.db.api.probe_pack_extra")559    @mock.patch("balancer.db.api.probe_create")560    @mock.patch("balancer.drivers.get_device_driver")561    @mock.patch("balancer.core.commands.add_probe_to_loadbalancer")562    @mock.patch("balancer.db.api.unpack_extra")563    def test_lb_add_probe_2(self, *mocks):564        """lb_probe['type']!=None"""565        lb_probe = {'type': 'Gvido'}566        mocks[5].side_effect = IndexError567        with self.assertRaises(exc.ServerFarmNotFound):568            api.lb_add_probe(self.conf, 'fake_tenant', self.lb_id, lb_probe)569   #     for mok in mocks:570   #         self.assertTrue(mok.called)571    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")572    @mock.patch("balancer.db.api.loadbalancer_get")573    @mock.patch("balancer.drivers.get_device_driver")574    @mock.patch("balancer.db.api.probe_get")575    @mock.patch("balancer.db.api.probe_destroy")576    @mock.patch("balancer.core.commands.remove_probe_from_server_farm")577    def test_lb_delete_probe(self, *mocks):578        mocks[5].return_value = self.dict_list579        api.lb_delete_probe(self.conf, 'fake_tenant', self.lb_id, self.lb_id)580        for mok in mocks:581            self.assertTrue(mok.called)582    @mock.patch("balancer.db.api.unpack_extra", autospec=True)583    @mock.patch("balancer.core.commands.create_vip", autospec=True)584    @mock.patch("balancer.drivers.get_device_driver", autospec=True)585    @mock.patch("balancer.db.api.virtualserver_create", autospec=True)586    @mock.patch("balancer.db.api.virtualserver_pack_extra", autospec=True)587    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id", autospec=True)588    @mock.patch("balancer.db.api.loadbalancer_get", autospec=True)589    def test_lb_add_vip(self,590                        mock_loadbalancer_get,591                        mock_serverfarm_get_all_by_lb_id,592                        mock_virtualserver_pack_extra,593                        mock_virtualserver_create,594                        mock_get_device_driver,595                        mock_create_vip,596                        mock_unpack_extra):597        # Mock598        lb_ref = {599            'id': 'fakelbid',600            'device_id': 'fakedeviceid',601            'protocol': 'HTTP',602        }603        mock_loadbalancer_get.return_value = lb_ref604        sf_ref = mock.MagicMock()605        sf_ref.__getitem__.return_value = 'fakesfid'606        mock_serverfarm_get_all_by_lb_id.return_value = [sf_ref]607        mock_virtualserver_pack_extra.return_value = {}608        mock_virtualserver_create.return_value = vip_ref = mock.Mock()609        ctx = mock.MagicMock()610        ctx.__enter__.return_value = enter_ctx = mock.Mock()611        mock_get_device_driver.return_value = \612            mock.Mock(request_context=mock.Mock(return_value=ctx))613        # Call614        api.lb_add_vip(self.conf, 'fake_tenant', 'fakelbid', 'fakevipdict')615        # Assert616        mock_loadbalancer_get.assert_called_once_with(self.conf,617                'fakelbid', tenant_id='fake_tenant')618        mock_serverfarm_get_all_by_lb_id.assert_called_once_with(self.conf,619                                                                 'fakelbid')620        mock_virtualserver_pack_extra.assert_called_once_with('fakevipdict')621        mock_virtualserver_create.assert_called_once_with(self.conf, {622            'lb_id': 'fakelbid',623            'sf_id': 'fakesfid',624            'extra': {625                'protocol': 'HTTP',626            },627        })628        mock_get_device_driver.assert_called_once_with(self.conf,629                                                       'fakedeviceid')630        mock_create_vip.assert_called_once_with(enter_ctx, vip_ref, sf_ref)631        mock_unpack_extra.assert_called_once_with(vip_ref)632    @mock.patch("balancer.db.api.unpack_extra", autospec=True)633    @mock.patch("balancer.core.commands.create_vip", autospec=True)634    @mock.patch("balancer.drivers.get_device_driver", autospec=True)635    @mock.patch("balancer.db.api.virtualserver_create", autospec=True)636    @mock.patch("balancer.db.api.virtualserver_pack_extra", autospec=True)637    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id", autospec=True)638    @mock.patch("balancer.db.api.loadbalancer_get", autospec=True)639    def test_lb_add_vip_failed(self,640                               mock_loadbalancer_get,641                               mock_serverfarm_get_all_by_lb_id,642                               mock_virtualserver_pack_extra,643                               mock_virtualserver_create,644                               mock_get_device_driver,645                               mock_create_vip,646                               mock_unpack_extra):647        mock_serverfarm_get_all_by_lb_id.return_value = []648        with self.assertRaises(exc.ServerFarmNotFound):649            api.lb_add_vip(self.conf, 'fake_tenant', 'fakelbid', 'fakevipdict')650        self.assertTrue(mock_loadbalancer_get.called)651        self.assertTrue(mock_serverfarm_get_all_by_lb_id.called)652        self.assertFalse(mock_virtualserver_pack_extra.called)653        self.assertFalse(mock_virtualserver_create.called)654        self.assertFalse(mock_get_device_driver.called)655        self.assertFalse(mock_create_vip.called)656        self.assertFalse(mock_unpack_extra.called)657    @mock.patch("balancer.core.commands.delete_vip", autospec=True)658    @mock.patch("balancer.drivers.get_device_driver", autospec=True)659    @mock.patch("balancer.db.api.virtualserver_destroy", autospec=True)660    @mock.patch("balancer.db.api.virtualserver_get", autospec=True)661    @mock.patch("balancer.db.api.loadbalancer_get", autospec=True)662    def test_lb_delete_vip(self,663                           mock_loadbalancer_get,664                           mock_virtualserver_get,665                           mock_virtualserver_destroy,666                           mock_get_device_driver,667                           mock_delete_vip):668        # Mock669        mock_loadbalancer_get.return_value = lb_ref = mock.MagicMock()670        lb_ref.__getitem__.return_value = 'fakedeviceid'671        mock_virtualserver_get.return_value = vip_ref = mock.Mock()672        ctx = mock.MagicMock()673        ctx.__enter__.return_value = enter_ctx = mock.Mock()674        mock_get_device_driver.return_value = \675            mock.Mock(request_context=mock.Mock(return_value=ctx))676        # Call677        api.lb_delete_vip(self.conf, 'fake_tenant', 'fakelbid', 'fakevipid')678        # Assert679        mock_loadbalancer_get.assert_called_once_with(self.conf,680                'fakelbid', tenant_id='fake_tenant')681        mock_virtualserver_get.assert_called_once_with(self.conf, 'fakevipid')682        mock_virtualserver_destroy.assert_called_once_with(self.conf,683                                                           'fakevipid')684        mock_get_device_driver.assert_called_once_with(self.conf,685                                                       'fakedeviceid')686        mock_delete_vip.assert_called_once_with(enter_ctx, vip_ref)687    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")688    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")689    @mock.patch("balancer.db.api.unpack_extra")690    def test_lb_show_sticky0(self, db_api0, db_api1, db_api2):691        db_api0.return_value = {'sticky': 'foo'}692        db_api1.return_value = self.dict_list693        db_api2.return_value.__getitem__.return_value.\694                             __getitem__.return_value = 2695        resp = api.lb_show_sticky(self.conf, 'fake_tenant', self.lb_id)696        self.assertEqual(resp, {"sessionPersistence": [{'sticky': 'foo'},697                                                       {'sticky': 'foo'}]})698        self.assertTrue(db_api0.called)699        self.assertTrue(db_api0.call_count == 2)700        self.assertTrue(db_api1.called)701        self.assertTrue(db_api2.called)702        db_api0.assert_any_call(self.dict_list[0])703        db_api0.assert_any_call(self.dict_list[1])704        db_api1.assert_called_once_with(self.conf, 2)705        db_api2.assert_called_once_with(self.conf,706                self.lb_id, tenant_id='fake_tenant')707    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")708    @mock.patch("balancer.db.api.sticky_get_all_by_sf_id")709    @mock.patch("balancer.db.api.unpack_extra")710    def test_lb_show_sticky1(self, db_api0, db_api1, db_api2):711        db_api2.return_value = []712        with self.assertRaises(exc.ServerFarmNotFound):713            api.lb_show_sticky(self.conf, 'fake_tenant', self.lb_id)714            self.assertFalse(db_api0.called)715            self.assertFalse(db_api1.called)716    @mock.patch("balancer.db.api.unpack_extra")717    @mock.patch("balancer.db.api.sticky_create")718    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")719    @mock.patch("balancer.db.api.loadbalancer_get")720    @mock.patch("balancer.db.api.sticky_pack_extra")721    @mock.patch("balancer.drivers.get_device_driver")722    @mock.patch("balancer.core.commands.add_sticky_to_loadbalancer")723    def test_lb_add_sticky0(self, *mocks):724        mocks[4].return_value = self.dict_list725        sticky = mock.MagicMock()726        api.lb_add_sticky(self.conf, 'fake_tenant', self.lb_id, sticky)727        for mok in mocks:728            self.assertTrue(mok.called)729    @mock.patch("balancer.db.api.unpack_extra")730    @mock.patch("balancer.db.api.sticky_create")731    @mock.patch("balancer.db.api.serverfarm_get_all_by_lb_id")732    @mock.patch("balancer.db.api.loadbalancer_get")733    @mock.patch("balancer.db.api.sticky_pack_extra")734    @mock.patch("balancer.drivers.get_device_driver")735    @mock.patch("balancer.core.commands.add_sticky_to_loadbalancer")736    def test_lb_add_sticky1(self, *mocks):737        sticky = {'type': None}738        resp = api.lb_add_sticky(self.conf, 'fake_tenant', self.lb_id, sticky)739        self.assertEqual(resp, None)740        for mock in mocks:741            self.assertFalse(mock.called)742    @mock.patch("balancer.db.api.loadbalancer_get")743    @mock.patch("balancer.db.api.sticky_get")744    @mock.patch("balancer.db.api.sticky_destroy")745    @mock.patch("balancer.drivers.get_device_driver")746    @mock.patch("balancer.core.commands.remove_sticky_from_loadbalancer")747    def test_lb_delete_sticky(self, mock_command, mock_driver, mock_destroy,748                              mock_get, mock_bal):749        mock_bal.return_value = {'id': 2, 'device_id': 2}750        resp = api.lb_delete_sticky(self.conf, 'fake_tenant', self.lb_id, 1)751        self.assertEqual(resp, 1)752        mock_bal.assert_called_once_with(self.conf,753                self.lb_id, tenant_id='fake_tenant')754        mock_get.assert_called_once_with(self.conf, 1)755        mock_destroy.assert_called_once_with(self.conf, 1)756        mock_driver.assert_called_once_with(self.conf, 2)757        with mock_driver.return_value.request_context() as ctx:758            mock_command.assert_called_once_with(ctx, mock_bal.return_value,759                                                 mock_get.return_value)760class TestDevice(unittest.TestCase):761    def setUp(self):762        self.conf = mock.MagicMock(register_group=mock.MagicMock)763        self.dict_list = ({'id': 1}, {'id': 2},)764    @mock.patch("balancer.db.api.device_get_all")765    @mock.patch("balancer.db.api.unpack_extra")766    def test_device_get_index(self, mock_f1, mock_f2):767        mock_f1.side_effect = [{'device': 1}, {'device': 2}]768        mock_f2.return_value = [[{'id': 1}], [{'id': 2}]]769        resp = api.device_get_index(self.conf)770        self.assertEqual(resp, [{'device': 1}, {'device': 2}])771        self.assertTrue(mock_f1.called)772        self.assertTrue(mock_f2.called)773        mock_f1.assert_any_call([{'id': 1}])774        mock_f1.assert_any_call([{'id': 2}])775        mock_f2.assert_called_once_with(self.conf)776    @mock.patch("balancer.db.api.device_pack_extra")777    @mock.patch("balancer.db.api.device_create")778    def test_device_create(self,  mock_f1, mock_f2):779        mock_f1.return_value = {'id': 1}780        resp = api.device_create(self.conf)781        self.assertEqual(resp['id'], 1)782        self.assertTrue(mock_f1.called, "device_create not called")783        self.assertTrue(mock_f2.called, "device_pack_extra not called")784        mock_f1.assert_caleld_once_with(self.conf, mock_f2.return_value)785        mock_f2.assert_called_once_with({})786    def test_device_info(self):787        params = {'query_params': 2}788        res = api.device_info(params)789        self.assertEquals(res, None, "Alyarma!")790    @mock.patch('balancer.db.api.device_get_all')791    @mock.patch('balancer.drivers.get_device_driver')792    def test_device_show_algorithms_0(self, mock_driver, mock_db_api):793        """capabilities = None"""794        mock_driver.get_capabilities = None795        mock_db_api.return_value = self.dict_list796        resp = api.device_show_algorithms(self.conf)797        self.assertEqual(resp, [])798    @mock.patch('balancer.db.api.device_get_all')799    @mock.patch('balancer.drivers.get_device_driver')800    def test_device_show_algorithms_1(self, mock_driver, mock_db_api):801        """capabilities is not empty, not None"""802        mock_db_api.return_value = self.dict_list803        mock_driver.return_value = drv = mock.MagicMock()804        drv.get_capabilities.return_value = {"algorithms": ["CRYSIS"]}805        resp = api.device_show_algorithms(self.conf)806        self.assertEqual(resp, ["CRYSIS"])807    @mock.patch('balancer.db.api.device_get_all')808    @mock.patch('balancer.drivers.get_device_driver')809    def test_device_show_algorithms_2(self, mock_driver, mock_db_api):810        """capabilities is empty"""811        mock_db_api.return_value = self.dict_list812        mock_driver.return_value = drv = mock.MagicMock()813        drv.get_capabilities.return_value = {}814        resp = api.device_show_algorithms(self.conf)815        self.assertEqual(resp, [])816    @mock.patch('balancer.db.api.device_get_all')817    @mock.patch('balancer.drivers.get_device_driver')818    def test_show_protocols_0(self, mock_driver, mock_db_api):819        """capabilities = None"""820        mock_driver.get_capabilities = None821        mock_db_api.return_value = self.dict_list822        resp = api.device_show_protocols(self.conf)823        self.assertEqual(resp, [])824    @mock.patch('balancer.db.api.device_get_all')825    @mock.patch('balancer.drivers.get_device_driver')826    def test_show_protocols_1(self, mock_driver, mock_db_api):827        """capabilities"""828        mock_db_api.return_value = self.dict_list829        mock_driver.return_value = drv = mock.MagicMock()830        drv.get_capabilities.return_value = {"protocols": ["CRYSIS"]}831        resp = api.device_show_protocols(self.conf)832        self.assertEqual(resp, ["CRYSIS"])833    @mock.patch('balancer.db.api.device_get_all')834    @mock.patch('balancer.drivers.get_device_driver')835    def test_show_protocols_2(self, mock_driver, mock_db_api):836        """capabilities"""837        mock_db_api.return_value = self.dict_list838        mock_driver.return_value = drv = mock.MagicMock()839        drv.get_capabilities.return_value = {}840        resp = api.device_show_protocols(self.conf)...api.py
Source:api.py  
...97        probes.append(db_api.probe_create(conf, probe))98    device_ref = scheduler.schedule(conf, lb_ref)99    db_api.loadbalancer_update(conf, lb_ref['id'],100                               {'device_id': device_ref['id']})101    device_driver = drivers.get_device_driver(conf, device_ref['id'])102    with device_driver.request_context() as ctx:103        try:104            commands.create_loadbalancer(ctx, sf_ref, vips, servers, probes,105                                         [])106        except Exception:107            with utils.save_and_reraise_exception():108                db_api.loadbalancer_update(conf, lb_ref['id'],109                                           {'status': lb_status.ERROR,110                                            'deployed': False})111    db_api.loadbalancer_update(conf, lb_ref['id'],112                               {'status': lb_status.ACTIVE,113                                'deployed': True})114    return lb_ref['id']115@asynchronous116def update_lb(conf, tenant_id, lb_id, lb_body):117    lb_ref = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)118    old_lb_ref = copy.deepcopy(lb_ref)119    db_api.pack_update(lb_ref, lb_body)120    lb_ref = db_api.loadbalancer_update(conf, lb_id, lb_ref)121    if (lb_ref['algorithm'] == old_lb_ref['algorithm'] and122        lb_ref['protocol'] == old_lb_ref['protocol']):123        LOG.debug("In LB %r algorithm and protocol have not changed, "124                     "nothing to do on the device %r.",125                     lb_ref['id'], lb_ref['device_id'])126        return127    sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_ref['id'])[0]128    if lb_ref['algorithm'] != old_lb_ref['algorithm']:129        predictor_ref = db_api.predictor_get_by_sf_id(conf, sf_ref['id'])130        db_api.predictor_update(conf, predictor_ref['id'],131                                {'type': lb_ref['algorithm']})132    vips = db_api.virtualserver_get_all_by_sf_id(conf, sf_ref['id'])133    if lb_ref['protocol'] != old_lb_ref['protocol']:134        vip_update_values = {'protocol': lb_ref['protocol']}135        for vip in vips:136            db_api.pack_update(vip, vip_update_values)137            db_api.virtualserver_update(conf, vip['id'], vip)138    servers = db_api.server_get_all_by_sf_id(conf, sf_ref['id'])139    probes = db_api.probe_get_all_by_sf_id(conf, sf_ref['id'])140    stickies = db_api.sticky_get_all_by_sf_id(conf, sf_ref['id'])141    device_ref = scheduler.reschedule(conf, lb_ref)142    if device_ref['id'] != lb_ref['device_id']:143        from_driver = drivers.get_device_driver(conf, lb_ref['device_id'])144        to_driver = drivers.get_device_driver(conf, device_ref['id'])145        lb_ref = db_api.loadbalancer_update(conf, lb_ref['id'],146                                            {'device_id': device_ref['id']})147    else:148        from_driver = drivers.get_device_driver(conf, device_ref['id'])149        to_driver = from_driver150    with from_driver.request_context() as ctx:151        try:152            commands.delete_loadbalancer(ctx, sf_ref, vips, servers, probes,153                                         stickies)154        except Exception:155            with utils.save_and_reraise_exception():156                db_api.loadbalancer_update(conf, lb_ref['id'],157                                           {'status': lb_status.ERROR})158    with to_driver.request_context() as ctx:159        try:160            commands.create_loadbalancer(ctx, sf_ref, vips, servers, probes,161                                         stickies)162        except Exception:163            with utils.save_and_reraise_exception():164                db_api.loadbalancer_update(conf, lb_ref['id'],165                                           {'status': lb_status.ERROR})166    db_api.loadbalancer_update(conf, lb_ref['id'],167                               {'status': lb_status.ACTIVE})168def delete_lb(conf, tenant_id, lb_id):169    lb_ref = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)170    sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_ref['id'])[0]171    vips = db_api.virtualserver_get_all_by_sf_id(conf, sf_ref['id'])172    servers = db_api.server_get_all_by_sf_id(conf, sf_ref['id'])173    probes = db_api.probe_get_all_by_sf_id(conf, sf_ref['id'])174    stickies = db_api.sticky_get_all_by_sf_id(conf, sf_ref['id'])175    device_driver = drivers.get_device_driver(conf, lb_ref['device_id'])176    with device_driver.request_context() as ctx:177        commands.delete_loadbalancer(ctx, sf_ref, vips, servers, probes,178                                     stickies)179    db_api.probe_destroy_by_sf_id(conf, sf_ref['id'])180    db_api.sticky_destroy_by_sf_id(conf, sf_ref['id'])181    db_api.server_destroy_by_sf_id(conf, sf_ref['id'])182    db_api.virtualserver_destroy_by_sf_id(conf, sf_ref['id'])183    db_api.predictor_destroy_by_sf_id(conf, sf_ref['id'])184    db_api.serverfarm_destroy(conf, sf_ref['id'])185    db_api.loadbalancer_destroy(conf, lb_ref['id'])186def lb_add_nodes(conf, tenant_id, lb_id, nodes):187    nodes_list = []188    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)189    sf = db_api.serverfarm_get_all_by_lb_id(conf, lb_id)[0]190    for node in nodes:191        values = db_api.server_pack_extra(node)192        values['sf_id'] = sf['id']193        if not values['status']:194            values['status'] = 'INSERVICE'195        rs_ref = db_api.server_create(conf, values)196        device_driver = drivers.get_device_driver(conf, lb['device_id'])197        with device_driver.request_context() as ctx:198            commands.add_node_to_loadbalancer(ctx, sf, rs_ref)199        nodes_list.append(db_api.unpack_extra(rs_ref))200    return nodes_list201def lb_show_nodes(conf, tenant_id, lb_id):202    node_list = []203    sf = db_api.serverfarm_get_all_by_lb_id(conf,204            lb_id, tenant_id=tenant_id)[0]205    node_list = map(db_api.unpack_extra,206                    db_api.server_get_all_by_sf_id(conf, sf['id']))207    return node_list208def lb_delete_node(conf, tenant_id, lb_id, lb_node_id):209    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)210    sf = db_api.serverfarm_get_all_by_lb_id(conf, lb_id)[0]211    rs = db_api.server_get(conf, lb_node_id)212    db_api.server_destroy(conf, lb_node_id)213    device_driver = drivers.get_device_driver(conf, lb['device_id'])214    with device_driver.request_context() as ctx:215        commands.remove_node_from_loadbalancer(ctx, sf, rs)216    return lb_node_id217def lb_change_node_status(conf, tenant_id, lb_id, lb_node_id, lb_node_status):218    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)219    rs = db_api.server_get(conf, lb_node_id)220    sf = db_api.serverfarm_get(conf, rs['sf_id'])221    if rs['state'] == lb_node_status:222        return "OK"223    rs['state'] = lb_node_status224    rsname = rs['name']225    if rs['parent_id'] != "":226        rs['name'] = rs['parent_id']227    LOG.debug("Changing RServer status to: %s" % lb_node_status)228    device_driver = drivers.get_device_driver(conf, lb['device_id'])229    with device_driver.request_context() as ctx:230        if lb_node_status == "inservice":231            commands.activate_rserver(ctx, sf, rs)232        else:233            commands.suspend_rserver(ctx, sf, rs)234    rs['name'] = rsname235    db_api.server_update(conf, rs['id'], rs)236    return db_api.unpack_extra(rs)237def lb_update_node(conf, tenant_id, lb_id, lb_node_id, lb_node):238    rs = db_api.server_get(conf, lb_node_id, tenant_id=tenant_id)239    lb = db_api.loadbalancer_get(conf, lb_id)240    device_driver = drivers.get_device_driver(conf, lb['device_id'])241    sf = db_api.serverfarm_get(conf, rs['sf_id'])242    with device_driver.request_context() as ctx:243        commands.delete_rserver_from_server_farm(ctx, sf, rs)244        db_api.pack_update(rs, lb_node)245        new_rs = db_api.server_update(conf, rs['id'], rs)246        commands.add_rserver_to_server_farm(ctx, sf, new_rs)247    return db_api.unpack_extra(new_rs)248def lb_show_probes(conf, tenant_id, lb_id):249    try:250        sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_id,251                tenant_id=tenant_id)[0]252    except IndexError:253        raise exc.ServerFarmNotFound254    probes = db_api.probe_get_all_by_sf_id(conf, sf_ref['id'])255    list = []256    dict = {"healthMonitoring": {}}257    for probe in probes:258        list.append(db_api.unpack_extra(probe))259    dict['healthMonitoring'] = list260    return dict261def lb_add_probe(conf, tenant_id, lb_id, probe_dict):262    LOG.debug("Got new probe description %s" % probe_dict)263    # NOTE(akscram): historically strange validation, wrong place for it.264    if probe_dict['type'] is None:265        return266    lb_ref = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)267    # NOTE(akscram): server farms are really only create problems than268    #                they solve multiply use of the virtual IPs.269    try:270        sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_ref['id'])[0]271    except IndexError:272        raise exc.ServerFarmNotFound273    values = db_api.probe_pack_extra(probe_dict)274    values['lb_id'] = lb_ref['id']275    values['sf_id'] = sf_ref['id']276    probe_ref = db_api.probe_create(conf, values)277    device_driver = drivers.get_device_driver(conf, lb_ref['device_id'])278    with device_driver.request_context() as ctx:279        commands.add_probe_to_loadbalancer(ctx, sf_ref, probe_ref)280    return db_api.unpack_extra(probe_ref)281def lb_delete_probe(conf, tenant_id, lb_id, probe_id):282    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)283    sf = db_api.serverfarm_get_all_by_lb_id(conf, lb_id)[0]284    probe = db_api.probe_get(conf, probe_id)285    db_api.probe_destroy(conf, probe_id)286    device_driver = drivers.get_device_driver(conf, lb['device_id'])287    with device_driver.request_context() as ctx:288        commands.remove_probe_from_server_farm(ctx, sf, probe)289    return probe_id290def lb_show_sticky(conf, tenant_id, lb_id):291    try:292        sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_id,293                tenant_id=tenant_id)[0]294    except IndexError:295        raise  exc.ServerFarmNotFound296    stickies = db_api.sticky_get_all_by_sf_id(conf, sf_ref['id'])297    list = []298    dict = {"sessionPersistence": {}}299    for sticky in stickies:300        list.append(db_api.unpack_extra(sticky))301    dict['sessionPersistence'] = list302    return dict303def lb_add_sticky(conf, tenant_id, lb_id, st):304    LOG.debug("Got new sticky description %s" % st)305    if st['type'] is None:306        return307    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)308    sf = db_api.serverfarm_get_all_by_lb_id(conf, lb_id)[0]309    values = db_api.sticky_pack_extra(st)310    values['sf_id'] = sf['id']311    sticky_ref = db_api.sticky_create(conf, values)312    device_driver = drivers.get_device_driver(conf, lb['device_id'])313    with device_driver.request_context() as ctx:314        commands.add_sticky_to_loadbalancer(ctx, lb, sticky_ref)315    return db_api.unpack_extra(sticky_ref)316def lb_delete_sticky(conf, tenant_id, lb_id, sticky_id):317    lb = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)318    sticky = db_api.sticky_get(conf, sticky_id)319    device_driver = drivers.get_device_driver(conf, lb['device_id'])320    with device_driver.request_context() as ctx:321        commands.remove_sticky_from_loadbalancer(ctx, lb, sticky)322    db_api.sticky_destroy(conf, sticky_id)323    return sticky_id324def lb_add_vip(conf, tenant_id, lb_id, vip_dict):325    LOG.debug("Called lb_add_vip(), conf: %r, lb_id: %s, vip_dict: %r",326                 conf, lb_id, vip_dict)327    lb_ref = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)328    # NOTE(akscram): server farms are really only create problems than329    #                they solve multiply use of the virtual IPs.330    try:331        sf_ref = db_api.serverfarm_get_all_by_lb_id(conf, lb_ref['id'])[0]332    except IndexError:333        raise exc.ServerFarmNotFound334    values = db_api.virtualserver_pack_extra(vip_dict)335    values['lb_id'] = lb_ref['id']336    values['sf_id'] = sf_ref['id']337    # XXX(akscram): Set default protocol from LoadBalancer to338    #               VirtualServer if it is not present.339    if not values.get('extra'):340        values['extra'] = {'protocol': lb_ref['protocol']}341    elif 'protocol' not in values['extra']:342        values['extra']['protocol'] = lb_ref['protocol']343    vip_ref = db_api.virtualserver_create(conf, values)344    device_driver = drivers.get_device_driver(conf, lb_ref['device_id'])345    with device_driver.request_context() as ctx:346        commands.create_vip(ctx, vip_ref, sf_ref)347    return db_api.unpack_extra(vip_ref)348def lb_delete_vip(conf, tenant_id, lb_id, vip_id):349    LOG.debug("Called lb_delete_vip(), conf: %r, lb_id: %s, vip_id: %s",350                 conf, lb_id, vip_id)351    lb_ref = db_api.loadbalancer_get(conf, lb_id, tenant_id=tenant_id)352    vip_ref = db_api.virtualserver_get(conf, vip_id)353    device_driver = drivers.get_device_driver(conf, lb_ref['device_id'])354    with device_driver.request_context() as ctx:355        commands.delete_vip(ctx, vip_ref)356    db_api.virtualserver_destroy(conf, vip_id)357def device_get_index(conf):358    devices = db_api.device_get_all(conf)359    devices = [db_api.unpack_extra(dev) for dev in devices]360    return devices361def device_create(conf, **params):362    device_dict = db_api.device_pack_extra(params)363    device = db_api.device_create(conf, device_dict)364    return device365def device_info(params):366    query = params['query_params']367    LOG.debug("DeviceInfoWorker start with Params: %s Query: %s",368                                                            params, query)369    return370def device_show_algorithms(conf):371    devices = db_api.device_get_all(conf)372    algorithms = []373    for device in devices:374        try:375            device_driver = drivers.get_device_driver(conf, device['id'])376            capabilities = device_driver.get_capabilities()377            if capabilities is not None:378                algorithms += [a for a in capabilities.get('algorithms', [])379                               if a not in algorithms]380        except Exception:381            LOG.warn('Failed to get supported algorithms of device %s',382                     device['name'], exc_info=True)383    return algorithms384def device_show_protocols(conf):385    devices = db_api.device_get_all(conf)386    protocols = []387    for device in devices:388        try:389            device_driver = drivers.get_device_driver(conf, device['id'])390            capabilities = device_driver.get_capabilities()391            if capabilities is not None:392                protocols += [a for a in capabilities.get('protocols', [])393                              if a not in protocols]394        except Exception:395            LOG.warn('Failed to get supported protocols of device %s',396                     device['name'], exc_info=True)397    return protocols398def device_delete(conf, device_id):399    try:400        lb_refs = db_api.loadbalancer_get_all_by_device_id(conf, device_id)401    except exc.LoadBalancerNotFound:402        db_api.device_destroy(conf, device_id)403        drivers.delete_device_driver(conf, device_id)...virsh_nodedev_detach_reattach.py
Source:virsh_nodedev_detach_reattach.py  
...23            driver = os.readlink(driver_path)24        except (OSError, UnicodeError):25            return None26        return driver27    def get_device_driver(device_address):28        """29        Get the driver of device.30        :param device_address: The address of device, such as pci_0000_19_00_031        :return: The driver of device, such as ixgbe, igb32        """33        driver_strings = get_driver_readlink(device_address).strip().split('/')34        driver = driver_strings[-1]35        return driver36    def detach_reattach_nodedev(device_address, params, options=""):37        """38        Do the detach and reattach.39        Step1.Do detach.40        Step2.Check the result of detach.41        Step3.Do reattach.42        Step4.Check the result of reattach43        """44        # Libvirt acl polkit related params45        uri = params.get("virsh_uri")46        unprivileged_user = params.get('unprivileged_user')47        readonly = (params.get('nodedev_detach_readonly', 'no') == 'yes')48        if unprivileged_user:49            if unprivileged_user.count('EXAMPLE'):50                unprivileged_user = 'testacl'51        # Do the detach52        logging.debug('Node device name is %s.', device_address)53        CmdResult = virsh.nodedev_detach(device_address, options,54                                         unprivileged_user=unprivileged_user,55                                         uri=uri, readonly=readonly)56        # Check the exit_status.57        libvirt.check_exit_status(CmdResult)58        # Check the driver.59        driver = get_driver_readlink(device_address)60        logging.debug('Driver after detach is %s.', driver)61        if libvirt_version.version_compare(1, 1, 1):62            device_driver_name = 'vfio-pci'63        else:64            device_driver_name = 'pci-stub'65        if (driver is None) or (not driver.endswith(device_driver_name)):66            test.fail("Driver for %s is not %s "67                      "after nodedev-detach" % (device_address, device_driver_name))68        # Do the reattach.69        CmdResult = virsh.nodedev_reattach(device_address, options)70        # Check the exit_status.71        libvirt.check_exit_status(CmdResult)72        # Check the driver.73        driver = get_driver_readlink(device_address)74        if libvirt_version.version_compare(1, 1, 1):75            device_driver_name = 'vfio-pci'76        else:77            device_driver_name = 'pci-stub'78        if driver and driver.endswith(device_driver_name):79            test.fail("Driver for %s is not %s "80                      "after nodedev-detach" % (device_address, device_driver_name))81    def pci_device_address():82        """83        Get the address of pci device84        """85        net_list = virsh.nodedev_list(tree='', cap='net')86        net_lists = net_list.stdout.strip().splitlines()87        route_cmd = " route | grep default"88        route_default = process.run(route_cmd, shell=True).stdout_text.strip().split(' ')89        ip_default = route_default[-1]90        for default_net_name in net_lists:91            if default_net_name.find(ip_default):92                default_net_address = nodedev_xml.NodedevXML.new_from_dumpxml(default_net_name).parent93                default_net_driver = get_device_driver(default_net_address)94                break95        for net_device_name in net_lists:96            if net_device_name.find(ip_default) == -1:97                net_device_address = nodedev_xml.NodedevXML.new_from_dumpxml(net_device_name).parent98                if 'pci' in net_device_address:99                    net_device_driver = get_device_driver(net_device_address)100                    if net_device_driver != default_net_driver:101                        return net_device_address102    def check_kernel_option():103        """104        Check the kernel option if the kernel cmdline include  "iommu=on" option105        """106        check_cmd = "egrep '(intel|amd)_iommu=on' /proc/cmdline"107        try:108            check_result = process.run(check_cmd, shell=True)109        except Exception:110            test.cancel("Operation not supported: neither VFIO nor KVM device assignment"111                        "is currently supported on this system")112        else:113            logging.debug('IOMMU is enabled')114    #Check kernel iommu option115    check_kernel_option()116    # Init variables117    device_address = params.get('nodedev_device', 'ENTER.YOUR.PCI.DEVICE.TO.DETACH')118    if device_address.find('ENTER.YOUR.PCI.DEVICE.TO.DETACH') != -1:119        replace_address = pci_device_address()120        if replace_address:121            device_address = device_address.replace('ENTER.YOUR.PCI.DEVICE.TO.DETACH', replace_address)122        else:123            test.cancel('Param device_address is not configured.')124    device_opt = params.get('nodedev_device_opt', '')125    status_error = ('yes' == params.get('status_error', 'no'))126    with_driver = params.get('with_driver', 'yes') == 'yes'127    # check variables.128    if not libvirt_version.version_compare(1, 1, 1):129        if params.get('setup_libvirt_polkit') == 'yes':130            test.cancel("API acl test not supported in current"131                        " libvirt version.")132    # check the device driver and delete the driver133    if not with_driver:134        device_driver = get_device_driver(device_address)135        remove_cmd = "modprobe -r %s" % device_driver136        remove_opt = process.system(remove_cmd, shell=True)137        if remove_opt != 0:138            test.fail("Fail to remove the device driver : %s" % device_driver)139    # Do nodedev_detach_reattach140    try:141        detach_reattach_nodedev(device_address, params, device_opt)142    except exceptions.TestFail as e:143        # Do nodedev detach and reattach failed.144        if status_error:145            return146        else:147            test.fail("Test failed in positive case."148                      "error: %s" % e)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
