How to use show_subnet method in tempest

Best Python code snippet using tempest_python

test_allowed_address_pairs.py

Source:test_allowed_address_pairs.py Github

copy

Full Screen

1# Copyright 2015 Rackspace2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may4# not use this file except in compliance with the License. You may obtain5# a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations13# under the License.14import copy15import mock16from neutronclient.common import exceptions as neutron_exceptions17from novaclient.client import exceptions as nova_exceptions18from oslo_config import cfg19from oslo_config import fixture as oslo_fixture20from oslo_utils import uuidutils21from octavia.common import clients22from octavia.common import constants23from octavia.common import data_models24from octavia.network import base as network_base25from octavia.network import data_models as network_models26from octavia.network.drivers.neutron import allowed_address_pairs27from octavia.network.drivers.neutron import base as neutron_base28from octavia.tests.common import constants as t_constants29from octavia.tests.common import data_model_helpers as dmh30from octavia.tests.unit import base31class TestAllowedAddressPairsDriver(base.TestCase):32 k_session = None33 driver = None34 SUBNET_ID_1 = "5"35 SUBNET_ID_2 = "8"36 FIXED_IP_ID_1 = "6"37 FIXED_IP_ID_2 = "8"38 NETWORK_ID_1 = "7"39 NETWORK_ID_2 = "10"40 IP_ADDRESS_1 = "10.0.0.2"41 IP_ADDRESS_2 = "12.0.0.2"42 AMPHORA_ID = "1"43 LB_ID = "2"44 COMPUTE_ID = "3"45 ACTIVE = "ACTIVE"46 LB_NET_IP = "10.0.0.2"47 LB_NET_PORT_ID = "6"48 HA_PORT_ID = "8"49 HA_IP = "12.0.0.2"50 PORT_ID = uuidutils.generate_uuid()51 DEVICE_ID = uuidutils.generate_uuid()52 def setUp(self):53 super(TestAllowedAddressPairsDriver, self).setUp()54 with mock.patch('octavia.common.clients.neutron_client.Client',55 autospec=True) as neutron_client:56 with mock.patch('octavia.common.clients.nova_client.Client',57 autospec=True):58 client = neutron_client(clients.NEUTRON_VERSION)59 client.list_extensions.return_value = {60 'extensions': [61 {'alias': allowed_address_pairs.AAP_EXT_ALIAS},62 {'alias': neutron_base.SEC_GRP_EXT_ALIAS}63 ]64 }65 self.k_session = mock.patch(66 'keystoneauth1.session.Session').start()67 self.driver = allowed_address_pairs.AllowedAddressPairsDriver()68 @mock.patch('octavia.network.drivers.neutron.base.BaseNeutronDriver.'69 '_check_extension_enabled', return_value=False)70 def test_check_aap_loaded(self, mock_check_ext):71 self.assertRaises(network_base.NetworkException,72 self.driver._check_aap_loaded)73 def test_get_interfaces_to_unplug(self):74 if1 = network_models.Interface()75 if1.network_id = 'if1-net'76 if1.port_id = 'if1-port'77 if1.fixed_ips = [network_models.FixedIP(ip_address='10.0.0.1')]78 if2 = network_models.Interface()79 if2.network_id = 'if2-net'80 if2.port_id = 'if2-port'81 if2.fixed_ips = [network_models.FixedIP(ip_address='11.0.0.1')]82 interfaces = [if1, if2]83 unpluggers = self.driver._get_interfaces_to_unplug(84 interfaces, 'if1-net')85 self.assertEqual([if1], unpluggers)86 unpluggers = self.driver._get_interfaces_to_unplug(87 interfaces, 'if1-net', ip_address='10.0.0.1')88 self.assertEqual([if1], unpluggers)89 unpluggers = self.driver._get_interfaces_to_unplug(90 interfaces, 'if1-net', ip_address='11.0.0.1')91 self.assertEqual([], unpluggers)92 unpluggers = self.driver._get_interfaces_to_unplug(93 interfaces, 'if3-net')94 self.assertEqual([], unpluggers)95 def test_deallocate_vip(self):96 lb = dmh.generate_load_balancer_tree()97 lb.vip.load_balancer = lb98 vip = lb.vip99 sec_grp_id = 'lb-sec-grp1'100 show_port = self.driver.neutron_client.show_port101 show_port.return_value = {'port': {102 'device_owner': allowed_address_pairs.OCTAVIA_OWNER}}103 delete_port = self.driver.neutron_client.delete_port104 delete_sec_grp = self.driver.neutron_client.delete_security_group105 list_security_groups = self.driver.neutron_client.list_security_groups106 security_groups = {107 'security_groups': [108 {'id': sec_grp_id}109 ]110 }111 list_security_groups.return_value = security_groups112 self.driver.deallocate_vip(vip)113 calls = [mock.call(vip.port_id)]114 for amp in lb.amphorae:115 calls.append(mock.call(amp.vrrp_port_id))116 delete_port.assert_has_calls(calls, any_order=True)117 delete_sec_grp.assert_called_once_with(sec_grp_id)118 def test_deallocate_vip_no_sec_group(self):119 lb = dmh.generate_load_balancer_tree()120 lb.vip.load_balancer = lb121 vip = lb.vip122 show_port = self.driver.neutron_client.show_port123 show_port.return_value = {'port': {124 'device_owner': allowed_address_pairs.OCTAVIA_OWNER}}125 delete_port = self.driver.neutron_client.delete_port126 delete_sec_grp = self.driver.neutron_client.delete_security_group127 list_security_groups = self.driver.neutron_client.list_security_groups128 security_groups = {129 'security_groups': []130 }131 list_security_groups.return_value = security_groups132 self.driver.deallocate_vip(vip)133 delete_port.assert_called_with(vip.port_id)134 delete_sec_grp.assert_not_called()135 def test_deallocate_vip_when_delete_port_fails(self):136 lb = dmh.generate_load_balancer_tree()137 vip = data_models.Vip(port_id='1')138 vip.load_balancer = lb139 show_port = self.driver.neutron_client.show_port140 show_port.return_value = {'port': {141 'device_owner': allowed_address_pairs.OCTAVIA_OWNER}}142 delete_port = self.driver.neutron_client.delete_port143 delete_port.side_effect = [None, None, TypeError]144 self.assertRaises(network_base.DeallocateVIPException,145 self.driver.deallocate_vip, vip)146 def test_deallocate_vip_when_port_not_found(self):147 lb = dmh.generate_load_balancer_tree()148 vip = data_models.Vip(port_id='1')149 vip.load_balancer = lb150 show_port = self.driver.neutron_client.show_port151 show_port.side_effect = neutron_exceptions.PortNotFoundClient152 self.assertRaises(network_base.VIPConfigurationNotFound,153 self.driver.deallocate_vip, vip)154 def test_deallocate_vip_when_port_not_owned_by_octavia(self):155 lb = dmh.generate_load_balancer_tree()156 lb.vip.load_balancer = lb157 vip = lb.vip158 sec_grp_id = 'lb-sec-grp1'159 show_port = self.driver.neutron_client.show_port160 show_port.return_value = {'port': {161 'id': vip.port_id,162 'device_owner': 'neutron:LOADBALANCERV2',163 'security_groups': [sec_grp_id]}}164 update_port = self.driver.neutron_client.update_port165 delete_sec_grp = self.driver.neutron_client.delete_security_group166 list_security_groups = self.driver.neutron_client.list_security_groups167 security_groups = {168 'security_groups': [169 {'id': sec_grp_id}170 ]171 }172 list_security_groups.return_value = security_groups173 self.driver.deallocate_vip(vip)174 expected_port_update = {'port': {'security_groups': []}}175 update_port.assert_called_once_with(vip.port_id, expected_port_update)176 delete_sec_grp.assert_called_once_with(sec_grp_id)177 def test_deallocate_vip_when_vip_port_not_found(self):178 lb = dmh.generate_load_balancer_tree()179 vip = data_models.Vip(port_id='1')180 vip.load_balancer = lb181 admin_project_id = 'octavia'182 session_mock = mock.MagicMock()183 session_mock.get_project_id.return_value = admin_project_id184 self.k_session.return_value = session_mock185 show_port = self.driver.neutron_client.show_port186 show_port.side_effect = neutron_exceptions.PortNotFoundClient187 self.assertRaises(network_base.VIPConfigurationNotFound,188 self.driver.deallocate_vip, vip)189 def test_plug_vip_errors_when_nova_cant_find_network_to_attach(self):190 lb = dmh.generate_load_balancer_tree()191 show_subnet = self.driver.neutron_client.show_subnet192 show_subnet.return_value = {193 'subnet': {194 'id': lb.vip.subnet_id195 }196 }197 list_security_groups = self.driver.neutron_client.list_security_groups198 lsc_side_effect = [199 None, {200 'security_groups': [201 {'id': 'lb-sec-grp1'}202 ]203 }204 ]205 list_security_groups.side_effect = lsc_side_effect206 interface_attach = self.driver.nova_client.servers.interface_attach207 interface_attach.side_effect = nova_exceptions.NotFound(404, "Network")208 self.assertRaises(network_base.PlugVIPException,209 self.driver.plug_vip, lb, lb.vip)210 def test_plug_vip_errors_when_neutron_cant_find_port_to_update(self):211 lb = dmh.generate_load_balancer_tree()212 show_subnet = self.driver.neutron_client.show_subnet213 show_subnet.return_value = {214 'subnet': {215 'id': lb.vip.subnet_id216 }217 }218 list_security_groups = self.driver.neutron_client.list_security_groups219 lsc_side_effect = [220 None, {221 'security_groups': [222 {'id': 'lb-sec-grp1'}223 ]224 }225 ]226 list_security_groups.side_effect = lsc_side_effect227 interface_attach = self.driver.nova_client.servers.interface_attach228 interface_attach.return_value = t_constants.MOCK_NOVA_INTERFACE229 update_port = self.driver.neutron_client.update_port230 update_port.side_effect = neutron_exceptions.PortNotFoundClient231 self.assertRaises(network_base.PortNotFound,232 self.driver.plug_vip, lb, lb.vip)233 def test_plug_vip(self):234 lb = dmh.generate_load_balancer_tree()235 show_subnet = self.driver.neutron_client.show_subnet236 show_subnet.return_value = {237 'subnet': {238 'id': t_constants.MOCK_VIP_SUBNET_ID,239 'network_id': t_constants.MOCK_VIP_NET_ID240 }241 }242 list_ports = self.driver.neutron_client.list_ports243 port1 = t_constants.MOCK_MANAGEMENT_PORT1['port']244 port2 = t_constants.MOCK_MANAGEMENT_PORT2['port']245 list_ports.side_effect = [{'ports': [port1]}, {'ports': [port2]}]246 interface_attach = self.driver.nova_client.servers.interface_attach247 interface_attach.side_effect = [t_constants.MOCK_VRRP_INTERFACE1,248 t_constants.MOCK_VRRP_INTERFACE2]249 list_security_groups = self.driver.neutron_client.list_security_groups250 list_security_groups.return_value = {251 'security_groups': [252 {'id': 'lb-sec-grp1'}253 ]254 }255 update_port = self.driver.neutron_client.update_port256 expected_aap = {'port': {'allowed_address_pairs':257 [{'ip_address': lb.vip.ip_address}]}}258 amps = self.driver.plug_vip(lb, lb.vip)259 self.assertEqual(5, update_port.call_count)260 for amp in amps:261 update_port.assert_any_call(amp.vrrp_port_id, expected_aap)262 self.assertIn(amp.vrrp_ip, [t_constants.MOCK_VRRP_IP1,263 t_constants.MOCK_VRRP_IP2])264 self.assertEqual(lb.vip.ip_address, amp.ha_ip)265 def _set_safely(self, obj, name, value):266 if isinstance(obj, dict):267 current = obj.get(name)268 self.addCleanup(obj.update, {name: current})269 obj.update({name: value})270 else:271 current = getattr(obj, name)272 self.addCleanup(setattr, obj, name, current)273 setattr(obj, name, value)274 def test_plug_vip_on_mgmt_net(self):275 lb = dmh.generate_load_balancer_tree()276 lb.vip.subnet_id = t_constants.MOCK_MANAGEMENT_SUBNET_ID277 show_subnet = self.driver.neutron_client.show_subnet278 show_subnet.return_value = {279 'subnet': {280 'id': t_constants.MOCK_MANAGEMENT_SUBNET_ID,281 'network_id': t_constants.MOCK_MANAGEMENT_NET_ID282 }283 }284 list_ports = self.driver.neutron_client.list_ports285 port1 = t_constants.MOCK_MANAGEMENT_PORT1['port']286 port2 = t_constants.MOCK_MANAGEMENT_PORT2['port']287 self._set_safely(t_constants.MOCK_MANAGEMENT_FIXED_IPS1[0],288 'ip_address', lb.amphorae[0].lb_network_ip)289 self._set_safely(t_constants.MOCK_MANAGEMENT_FIXED_IPS2[0],290 'ip_address', lb.amphorae[1].lb_network_ip)291 list_ports.side_effect = [{'ports': [port1]}, {'ports': [port2]}]292 interface_attach = self.driver.nova_client.servers.interface_attach293 self._set_safely(t_constants.MOCK_VRRP_INTERFACE1,294 'net_id', t_constants.MOCK_MANAGEMENT_NET_ID)295 self._set_safely(t_constants.MOCK_VRRP_FIXED_IPS1[0],296 'subnet_id', t_constants.MOCK_MANAGEMENT_SUBNET_ID)297 self._set_safely(t_constants.MOCK_VRRP_INTERFACE2,298 'net_id', t_constants.MOCK_MANAGEMENT_NET_ID)299 self._set_safely(t_constants.MOCK_VRRP_FIXED_IPS2[0],300 'subnet_id', t_constants.MOCK_MANAGEMENT_SUBNET_ID)301 interface_attach.side_effect = [t_constants.MOCK_VRRP_INTERFACE1,302 t_constants.MOCK_VRRP_INTERFACE2]303 list_security_groups = self.driver.neutron_client.list_security_groups304 list_security_groups.return_value = {305 'security_groups': [306 {'id': 'lb-sec-grp1'}307 ]308 }309 update_port = self.driver.neutron_client.update_port310 expected_aap = {'port': {'allowed_address_pairs':311 [{'ip_address': lb.vip.ip_address}]}}312 amps = self.driver.plug_vip(lb, lb.vip)313 self.assertEqual(5, update_port.call_count)314 for amp in amps:315 update_port.assert_any_call(amp.vrrp_port_id, expected_aap)316 self.assertIn(amp.vrrp_ip, [t_constants.MOCK_VRRP_IP1,317 t_constants.MOCK_VRRP_IP2])318 self.assertEqual(lb.vip.ip_address, amp.ha_ip)319 def test_allocate_vip_when_port_already_provided(self):320 show_port = self.driver.neutron_client.show_port321 show_port.return_value = t_constants.MOCK_NEUTRON_PORT322 fake_lb_vip = data_models.Vip(323 port_id=t_constants.MOCK_PORT_ID,324 subnet_id=t_constants.MOCK_SUBNET_ID)325 fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)326 vip = self.driver.allocate_vip(fake_lb)327 self.assertIsInstance(vip, data_models.Vip)328 self.assertEqual(t_constants.MOCK_IP_ADDRESS, vip.ip_address)329 self.assertEqual(t_constants.MOCK_SUBNET_ID, vip.subnet_id)330 self.assertEqual(t_constants.MOCK_PORT_ID, vip.port_id)331 self.assertEqual(fake_lb.id, vip.load_balancer_id)332 def test_allocate_vip_when_port_creation_fails(self):333 fake_lb_vip = data_models.Vip(334 subnet_id=t_constants.MOCK_SUBNET_ID)335 fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)336 create_port = self.driver.neutron_client.create_port337 create_port.side_effect = Exception338 self.assertRaises(network_base.AllocateVIPException,339 self.driver.allocate_vip, fake_lb)340 def test_allocate_vip_when_no_port_provided(self):341 port_create_dict = copy.copy(t_constants.MOCK_NEUTRON_PORT)342 port_create_dict['port']['device_owner'] = (343 allowed_address_pairs.OCTAVIA_OWNER)344 port_create_dict['port']['device_id'] = 'lb-1'345 create_port = self.driver.neutron_client.create_port346 create_port.return_value = port_create_dict347 show_subnet = self.driver.neutron_client.show_subnet348 show_subnet.return_value = {'subnet': {349 'id': t_constants.MOCK_SUBNET_ID,350 'network_id': t_constants.MOCK_NETWORK_ID351 }}352 fake_lb_vip = data_models.Vip(subnet_id=t_constants.MOCK_SUBNET_ID,353 network_id=t_constants.MOCK_NETWORK_ID)354 fake_lb = data_models.LoadBalancer(id='1', vip=fake_lb_vip)355 vip = self.driver.allocate_vip(fake_lb)356 exp_create_port_call = {357 'port': {358 'name': 'octavia-lb-1',359 'network_id': t_constants.MOCK_NETWORK_ID,360 'device_id': 'lb-1',361 'device_owner': allowed_address_pairs.OCTAVIA_OWNER,362 'admin_state_up': False363 }364 }365 create_port.assert_called_once_with(exp_create_port_call)366 self.assertIsInstance(vip, data_models.Vip)367 self.assertEqual(t_constants.MOCK_IP_ADDRESS, vip.ip_address)368 self.assertEqual(t_constants.MOCK_SUBNET_ID, vip.subnet_id)369 self.assertEqual(t_constants.MOCK_PORT_ID, vip.port_id)370 self.assertEqual(fake_lb.id, vip.load_balancer_id)371 def test_unplug_vip_errors_when_update_port_cant_find_port(self):372 lb = dmh.generate_load_balancer_tree()373 list_ports = self.driver.neutron_client.list_ports374 show_subnet = self.driver.neutron_client.show_subnet375 show_subnet.return_value = t_constants.MOCK_SUBNET376 port1 = t_constants.MOCK_NEUTRON_PORT['port']377 port2 = {378 'id': '4', 'network_id': '3', 'fixed_ips':379 [{'ip_address': '10.0.0.2'}]380 }381 list_ports.return_value = {'ports': [port1, port2]}382 update_port = self.driver.neutron_client.update_port383 update_port.side_effect = neutron_exceptions.PortNotFoundClient384 self.assertRaises(network_base.UnplugVIPException,385 self.driver.unplug_vip, lb, lb.vip)386 def test_unplug_vip_errors_when_update_port_fails(self):387 lb = dmh.generate_load_balancer_tree()388 show_subnet = self.driver.neutron_client.show_subnet389 show_subnet.return_value = t_constants.MOCK_SUBNET390 port1 = t_constants.MOCK_NEUTRON_PORT['port']391 port2 = {392 'id': '4', 'network_id': '3', 'fixed_ips':393 [{'ip_address': '10.0.0.2'}]394 }395 list_ports = self.driver.neutron_client.list_ports396 list_ports.return_value = {'ports': [port1, port2]}397 update_port = self.driver.neutron_client.update_port398 update_port.side_effect = TypeError399 self.assertRaises(network_base.UnplugVIPException,400 self.driver.unplug_vip, lb, lb.vip)401 def test_unplug_vip_errors_when_vip_subnet_not_found(self):402 lb = dmh.generate_load_balancer_tree()403 show_subnet = self.driver.neutron_client.show_subnet404 show_subnet.side_effect = neutron_exceptions.NotFound405 self.assertRaises(network_base.PluggedVIPNotFound,406 self.driver.unplug_vip, lb, lb.vip)407 def test_unplug_vip(self):408 lb = dmh.generate_load_balancer_tree()409 show_subnet = self.driver.neutron_client.show_subnet410 show_subnet.return_value = t_constants.MOCK_SUBNET411 update_port = self.driver.neutron_client.update_port412 port1 = t_constants.MOCK_NEUTRON_PORT['port']413 port2 = {414 'id': '4', 'network_id': '3', 'fixed_ips':415 [{'ip_address': '10.0.0.2'}]416 }417 list_ports = self.driver.neutron_client.list_ports418 list_ports.return_value = {'ports': [port1, port2]}419 self.driver.unplug_vip(lb, lb.vip)420 self.assertEqual(len(lb.amphorae), update_port.call_count)421 clear_aap = {'port': {'allowed_address_pairs': []}}422 update_port.assert_has_calls([mock.call(port1.get('id'), clear_aap),423 mock.call(port1.get('id'), clear_aap)])424 def test_plug_network_when_compute_instance_cant_be_found(self):425 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id426 interface_attach = self.driver.nova_client.servers.interface_attach427 interface_attach.side_effect = nova_exceptions.NotFound(428 404, message='Instance not found')429 self.assertRaises(network_base.AmphoraNotFound,430 self.driver.plug_network,431 t_constants.MOCK_COMPUTE_ID, net_id)432 def test_plug_network_when_network_cant_be_found(self):433 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id434 interface_attach = self.driver.nova_client.servers.interface_attach435 interface_attach.side_effect = nova_exceptions.NotFound(436 404, message='Network not found')437 self.assertRaises(network_base.NetworkException,438 self.driver.plug_network,439 t_constants.MOCK_COMPUTE_ID, net_id)440 def test_plug_network_when_interface_attach_fails(self):441 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id442 interface_attach = self.driver.nova_client.servers.interface_attach443 interface_attach.side_effect = TypeError444 self.assertRaises(network_base.PlugNetworkException,445 self.driver.plug_network,446 t_constants.MOCK_COMPUTE_ID, net_id)447 def test_plug_network(self):448 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id449 interface_attach = self.driver.nova_client.servers.interface_attach450 interface_attach.return_value = t_constants.MOCK_NOVA_INTERFACE451 oct_interface = self.driver.plug_network(452 t_constants.MOCK_COMPUTE_ID, net_id)453 exp_ips = [fixed_ip.get('ip_address')454 for fixed_ip in t_constants.MOCK_NOVA_INTERFACE.fixed_ips]455 actual_ips = [fixed_ip.ip_address456 for fixed_ip in oct_interface.fixed_ips]457 self.assertEqual(exp_ips, actual_ips)458 self.assertEqual(t_constants.MOCK_COMPUTE_ID,459 oct_interface.compute_id)460 self.assertEqual(net_id, oct_interface.network_id)461 def test_unplug_network_when_compute_port_cant_be_found(self):462 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id463 list_ports = self.driver.neutron_client.list_ports464 list_ports.return_value = {'ports': []}465 self.assertRaises(network_base.AmphoraNotFound,466 self.driver.unplug_network,467 t_constants.MOCK_COMPUTE_ID, net_id)468 def test_unplug_network_when_list_ports_fails(self):469 net_id = t_constants.MOCK_NOVA_INTERFACE.net_id470 list_ports = self.driver.neutron_client.list_ports471 list_ports.side_effect = Exception472 self.assertRaises(network_base.NetworkException,473 self.driver.unplug_network,474 t_constants.MOCK_COMPUTE_ID, net_id)475 def test_unplug_network_when_interface_detach_fails(self):476 list_ports = self.driver.neutron_client.list_ports477 port1 = t_constants.MOCK_NEUTRON_PORT['port']478 port2 = {479 'id': '4', 'network_id': '3', 'fixed_ips':480 [{'ip_address': '10.0.0.2'}]481 }482 list_ports.return_value = {'ports': [port1, port2]}483 interface_detach = self.driver.nova_client.servers.interface_detach484 interface_detach.side_effect = Exception485 self.assertRaises(network_base.UnplugNetworkException,486 self.driver.unplug_network,487 t_constants.MOCK_COMPUTE_ID,488 port2.get('network_id'))489 def test_unplug_network(self):490 list_ports = self.driver.neutron_client.list_ports491 port1 = t_constants.MOCK_NEUTRON_PORT['port']492 port2 = {493 'id': '4', 'network_id': '3', 'fixed_ips':494 [{'ip_address': '10.0.0.2'}]495 }496 list_ports.return_value = {'ports': [port1, port2]}497 interface_detach = self.driver.nova_client.servers.interface_detach498 self.driver.unplug_network(t_constants.MOCK_COMPUTE_ID,499 port2.get('network_id'))500 interface_detach.assert_called_once_with(501 server=t_constants.MOCK_COMPUTE_ID, port_id=port2.get('id'))502 def test_update_vip(self):503 listeners = [data_models.Listener(protocol_port=80, peer_port=1024),504 data_models.Listener(protocol_port=443, peer_port=1025)]505 vip = data_models.Vip(ip_address='10.0.0.2')506 lb = data_models.LoadBalancer(id='1', listeners=listeners, vip=vip)507 list_sec_grps = self.driver.neutron_client.list_security_groups508 list_sec_grps.return_value = {'security_groups': [{'id': 'secgrp-1'}]}509 fake_rules = {510 'security_group_rules': [511 {'id': 'rule-80', 'port_range_max': 80, 'protocol': 'tcp'},512 {'id': 'rule-22', 'port_range_max': 22, 'protocol': 'tcp'}513 ]514 }515 list_rules = self.driver.neutron_client.list_security_group_rules516 list_rules.return_value = fake_rules517 delete_rule = self.driver.neutron_client.delete_security_group_rule518 create_rule = self.driver.neutron_client.create_security_group_rule519 self.driver.update_vip(lb)520 delete_rule.assert_called_once_with('rule-22')521 expected_create_rule_1 = {522 'security_group_rule': {523 'security_group_id': 'secgrp-1',524 'direction': 'ingress',525 'protocol': 'TCP',526 'port_range_min': 1024,527 'port_range_max': 1024,528 'ethertype': 'IPv4'529 }530 }531 expected_create_rule_2 = {532 'security_group_rule': {533 'security_group_id': 'secgrp-1',534 'direction': 'ingress',535 'protocol': 'TCP',536 'port_range_min': 1025,537 'port_range_max': 1025,538 'ethertype': 'IPv4'539 }540 }541 expected_create_rule_3 = {542 'security_group_rule': {543 'security_group_id': 'secgrp-1',544 'direction': 'ingress',545 'protocol': 'TCP',546 'port_range_min': 443,547 'port_range_max': 443,548 'ethertype': 'IPv4'549 }550 }551 create_rule.assert_has_calls([mock.call(expected_create_rule_1),552 mock.call(expected_create_rule_2),553 mock.call(expected_create_rule_3)])554 def test_update_vip_when_listener_deleted(self):555 listeners = [data_models.Listener(protocol_port=80),556 data_models.Listener(557 protocol_port=443,558 provisioning_status=constants.PENDING_DELETE)]559 vip = data_models.Vip(ip_address='10.0.0.2')560 lb = data_models.LoadBalancer(id='1', listeners=listeners, vip=vip)561 list_sec_grps = self.driver.neutron_client.list_security_groups562 list_sec_grps.return_value = {'security_groups': [{'id': 'secgrp-1'}]}563 fake_rules = {564 'security_group_rules': [565 {'id': 'rule-80', 'port_range_max': 80, 'protocol': 'tcp'},566 {'id': 'rule-22', 'port_range_max': 443, 'protocol': 'tcp'}567 ]568 }569 list_rules = self.driver.neutron_client.list_security_group_rules570 list_rules.return_value = fake_rules571 delete_rule = self.driver.neutron_client.delete_security_group_rule572 create_rule = self.driver.neutron_client.create_security_group_rule573 self.driver.update_vip(lb)574 delete_rule.assert_called_once_with('rule-22')575 self.assertTrue(create_rule.called)576 def test_update_vip_when_no_listeners(self):577 listeners = []578 vip = data_models.Vip(ip_address='10.0.0.2')579 lb = data_models.LoadBalancer(id='1', listeners=listeners, vip=vip)580 list_sec_grps = self.driver.neutron_client.list_security_groups581 list_sec_grps.return_value = {'security_groups': [{'id': 'secgrp-1'}]}582 fake_rules = {583 'security_group_rules': [584 {'id': 'all-egress', 'protocol': None, 'direction': 'egress'},585 {'id': 'ssh-rule', 'protocol': 'tcp', 'port_range_max': 22}586 ]587 }588 list_rules = self.driver.neutron_client.list_security_group_rules589 list_rules.return_value = fake_rules590 delete_rule = self.driver.neutron_client.delete_security_group_rule591 self.driver.update_vip(lb)592 delete_rule.assert_called_once_with('ssh-rule')593 def test_failover_preparation(self):594 original_dns_integration_state = self.driver.dns_integration_enabled595 self.driver.dns_integration_enabled = False596 ports = {"ports": [597 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,598 "ip_address": self.IP_ADDRESS_1}],599 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},600 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,601 "ip_address": self.IP_ADDRESS_2}],602 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}603 self.driver.neutron_client.list_ports.return_value = ports604 self.driver.neutron_client.show_port = mock.Mock(605 side_effect=self._failover_show_port_side_effect)606 port_update = self.driver.neutron_client.update_port607 amphora = data_models.Amphora(608 id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,609 compute_id=self.COMPUTE_ID, status=self.ACTIVE,610 lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,611 ha_ip=self.HA_IP)612 self.driver.failover_preparation(amphora)613 self.assertFalse(port_update.called)614 self.driver.dns_integration_enabled = original_dns_integration_state615 def test_failover_preparation_dns_integration(self):616 ports = {"ports": [617 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,618 "ip_address": self.IP_ADDRESS_1}],619 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},620 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,621 "ip_address": self.IP_ADDRESS_2}],622 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}623 original_dns_integration_state = self.driver.dns_integration_enabled624 self.driver.dns_integration_enabled = True625 self.driver.neutron_client.list_ports.return_value = ports626 self.driver.neutron_client.show_port = mock.Mock(627 side_effect=self._failover_show_port_side_effect)628 port_update = self.driver.neutron_client.update_port629 amphora = data_models.Amphora(630 id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,631 compute_id=self.COMPUTE_ID, status=self.ACTIVE,632 lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,633 ha_ip=self.HA_IP)634 self.driver.failover_preparation(amphora)635 port_update.assert_called_once_with(ports['ports'][1].get('id'),636 {'port': {'dns_name': ''}})637 self.driver.dns_integration_enabled = original_dns_integration_state638 def _failover_show_port_side_effect(self, port_id):639 if port_id == self.LB_NET_PORT_ID:640 return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,641 "ip_address": self.IP_ADDRESS_1}],642 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1}643 if port_id == self.HA_PORT_ID:644 return {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,645 "ip_address": self.IP_ADDRESS_2}],646 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}647 def test_plug_port(self):648 port = mock.MagicMock()649 port.id = self.PORT_ID650 interface_attach = self.driver.nova_client.servers.interface_attach651 amphora = data_models.Amphora(652 id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,653 compute_id=self.COMPUTE_ID, status=self.ACTIVE,654 lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,655 ha_ip=self.HA_IP)656 self.driver.plug_port(amphora, port)657 interface_attach.assert_called_once_with(server=amphora.compute_id,658 net_id=None,659 fixed_ip=None,660 port_id=self.PORT_ID)661 # NotFound cases662 interface_attach.side_effect = nova_exceptions.NotFound(663 1, message='Instance')664 self.assertRaises(network_base.AmphoraNotFound,665 self.driver.plug_port,666 amphora,667 port)668 interface_attach.side_effect = nova_exceptions.NotFound(669 1, message='Network')670 self.assertRaises(network_base.NetworkNotFound,671 self.driver.plug_port,672 amphora,673 port)674 interface_attach.side_effect = nova_exceptions.NotFound(675 1, message='bogus')676 self.assertRaises(network_base.PlugNetworkException,677 self.driver.plug_port,678 amphora,679 port)680 # Already plugged case should not raise an exception681 interface_attach.side_effect = nova_exceptions.Conflict(1)682 self.driver.plug_port(amphora, port)683 # Unknown error case684 interface_attach.side_effect = TypeError685 self.assertRaises(network_base.PlugNetworkException,686 self.driver.plug_port,687 amphora,688 port)689 def test_get_network_configs(self):690 amphora_mock = mock.MagicMock()691 load_balancer_mock = mock.MagicMock()692 vip_mock = mock.MagicMock()693 amphora_mock.status = constants.DELETED694 load_balancer_mock.amphorae = [amphora_mock]695 show_port = self.driver.neutron_client.show_port696 show_port.return_value = t_constants.MOCK_NEUTRON_PORT697 fake_subnet = {'subnet': {698 'id': t_constants.MOCK_SUBNET_ID,699 'gateway_ip': t_constants.MOCK_IP_ADDRESS,700 'cidr': t_constants.MOCK_CIDR}}701 show_subnet = self.driver.neutron_client.show_subnet702 show_subnet.return_value = fake_subnet703 configs = self.driver.get_network_configs(load_balancer_mock)704 self.assertEqual({}, configs)705 vip_mock.port_id = 1706 amphora_mock.id = 222707 amphora_mock.status = constants.ACTIVE708 amphora_mock.vrrp_port_id = 2709 amphora_mock.vrrp_ip = "10.0.0.1"710 amphora_mock.ha_port_id = 3711 amphora_mock.ha_ip = "10.0.0.2"712 load_balancer_mock.amphorae = [amphora_mock]713 configs = self.driver.get_network_configs(load_balancer_mock)714 self.assertEqual(1, len(configs))715 config = configs[222]716 # TODO(ptoohill): find a way to return different items for multiple717 # calls to the same method, right now each call to show subnet718 # will return the same values if a method happens to call it719 # multiple times for different subnets. We should be able to verify720 # different requests get different expected data.721 expected_port_id = t_constants.MOCK_NEUTRON_PORT['port']['id']722 self.assertEqual(expected_port_id, config.ha_port.id)723 self.assertEqual(expected_port_id, config.vrrp_port.id)724 expected_subnet_id = fake_subnet['subnet']['id']725 self.assertEqual(expected_subnet_id, config.ha_subnet.id)726 self.assertEqual(expected_subnet_id, config.vrrp_subnet.id)727 @mock.patch('time.sleep')728 def test_wait_for_port_detach(self, mock_sleep):729 amphora = data_models.Amphora(730 id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,731 compute_id=self.COMPUTE_ID, status=self.ACTIVE,732 lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,733 ha_ip=self.HA_IP)734 ports = {"ports": [735 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,736 "ip_address": self.IP_ADDRESS_1}],737 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},738 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,739 "ip_address": self.IP_ADDRESS_2}],740 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}741 show_port_1_without_device_id = {"fixed_ips": [742 {"subnet_id": self.SUBNET_ID_1, "ip_address": self.IP_ADDRESS_1}],743 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1,744 "device_id": ''}745 show_port_2_with_device_id = {"fixed_ips": [746 {"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],747 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,748 "device_id": self.DEVICE_ID}749 show_port_2_without_device_id = {"fixed_ips": [750 {"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],751 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,752 "device_id": None}753 self.driver.neutron_client.list_ports.return_value = ports754 port_mock = mock.MagicMock()755 port_mock.get = mock.Mock(756 side_effect=[show_port_1_without_device_id,757 show_port_2_with_device_id,758 show_port_2_with_device_id,759 show_port_2_without_device_id])760 self.driver.neutron_client.show_port.return_value = port_mock761 self.driver.wait_for_port_detach(amphora)762 self.assertEqual(1, mock_sleep.call_count)763 @mock.patch('time.time')764 @mock.patch('time.sleep')765 def test_wait_for_port_detach_timeout(self, mock_sleep, mock_time):766 mock_time.side_effect = [1, 2, 6]767 conf = oslo_fixture.Config(cfg.CONF)768 conf.config(group="networking", port_detach_timeout=5)769 amphora = data_models.Amphora(770 id=self.AMPHORA_ID, load_balancer_id=self.LB_ID,771 compute_id=self.COMPUTE_ID, status=self.ACTIVE,772 lb_network_ip=self.LB_NET_IP, ha_port_id=self.HA_PORT_ID,773 ha_ip=self.HA_IP)774 ports = {"ports": [775 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_1,776 "ip_address": self.IP_ADDRESS_1}],777 "id": self.FIXED_IP_ID_1, "network_id": self.NETWORK_ID_1},778 {"fixed_ips": [{"subnet_id": self.SUBNET_ID_2,779 "ip_address": self.IP_ADDRESS_2}],780 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2}]}781 show_port_1_with_device_id = {"fixed_ips": [782 {"subnet_id": self.SUBNET_ID_2, "ip_address": self.IP_ADDRESS_2}],783 "id": self.FIXED_IP_ID_2, "network_id": self.NETWORK_ID_2,784 "device_id": self.DEVICE_ID}785 self.driver.neutron_client.list_ports.return_value = ports786 port_mock = mock.MagicMock()787 port_mock.get = mock.Mock(788 return_value=show_port_1_with_device_id)789 self.driver.neutron_client.show_port.return_value = port_mock790 self.assertRaises(network_base.TimeoutException,791 self.driver.wait_for_port_detach,...

Full Screen

Full Screen

test_debug_commands.py

Source:test_debug_commands.py Github

copy

Full Screen

...108 'fixed_ips': [{'subnet_id': 'fake_subnet'}],109 'device_id': socket.gethostname()}}110 namespace = 'qprobe-fake_port'111 self.client.assert_has_calls([mock.call.show_network('fake_net'),112 mock.call.show_subnet('fake_subnet'),113 mock.call.create_port(fake_port),114 mock.call.show_subnet('fake_subnet')])115 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),116 mock.call.plug('fake_net',117 'fake_port',118 'tap12345678-12',119 'aa:bb:cc:dd:ee:ffa',120 bridge=None,121 namespace=namespace),122 mock.call.init_l3('tap12345678-12',123 ['10.0.0.3/24'],124 namespace=namespace125 )])126 def test_create_network_probe(self):127 self._test_create_probe(debug_agent.DEVICE_OWNER_NETWORK_PROBE)128 def test_create_nova_probe(self):129 self._test_create_probe(debug_agent.DEVICE_OWNER_COMPUTE_PROBE)130 def _test_create_probe_external(self, device_owner):131 fake_network = {'network': {'id': 'fake_net',132 'tenant_id': 'fake_tenant',133 'router:external': True,134 'subnets': ['fake_subnet']}}135 self.client.show_network.return_value = fake_network136 cmd = commands.CreateProbe(self.app, None)137 cmd_parser = cmd.get_parser('create_probe')138 if device_owner == debug_agent.DEVICE_OWNER_COMPUTE_PROBE:139 args = ['fake_net', '--device-owner', 'compute']140 else:141 args = ['fake_net']142 parsed_args = cmd_parser.parse_args(args)143 cmd.run(parsed_args)144 fake_port = {'port':145 {'device_owner': device_owner,146 'admin_state_up': True,147 'network_id': 'fake_net',148 'tenant_id': 'fake_tenant',149 'binding:host_id': cfg.CONF.host,150 'fixed_ips': [{'subnet_id': 'fake_subnet'}],151 'device_id': socket.gethostname()}}152 namespace = 'qprobe-fake_port'153 self.client.assert_has_calls([mock.call.show_network('fake_net'),154 mock.call.show_subnet('fake_subnet'),155 mock.call.create_port(fake_port),156 mock.call.show_subnet('fake_subnet')])157 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),158 mock.call.plug('fake_net',159 'fake_port',160 'tap12345678-12',161 'aa:bb:cc:dd:ee:ffa',162 bridge='br-ex',163 namespace=namespace),164 mock.call.init_l3('tap12345678-12',165 ['10.0.0.3/24'],166 namespace=namespace167 )])168 def test_create_network_probe_external(self):169 self._test_create_probe_external(170 debug_agent.DEVICE_OWNER_NETWORK_PROBE)171 def test_create_nova_probe_external(self):172 self._test_create_probe_external(173 debug_agent.DEVICE_OWNER_COMPUTE_PROBE)174 def test_delete_probe(self):175 cmd = commands.DeleteProbe(self.app, None)176 cmd_parser = cmd.get_parser('delete_probe')177 args = ['fake_port']178 parsed_args = cmd_parser.parse_args(args)179 cmd.run(parsed_args)180 namespace = 'qprobe-fake_port'181 self.client.assert_has_calls([mock.call.show_port('fake_port'),182 mock.call.show_network('fake_net'),183 mock.call.show_subnet('fake_subnet'),184 mock.call.delete_port('fake_port')])185 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),186 mock.call.unplug('tap12345678-12',187 namespace=namespace,188 bridge=None)])189 def test_delete_probe_external(self):190 fake_network = {'network': {'id': 'fake_net',191 'tenant_id': 'fake_tenant',192 'router:external': True,193 'subnets': ['fake_subnet']}}194 self.client.show_network.return_value = fake_network195 cmd = commands.DeleteProbe(self.app, None)196 cmd_parser = cmd.get_parser('delete_probe')197 args = ['fake_port']198 parsed_args = cmd_parser.parse_args(args)199 cmd.run(parsed_args)200 namespace = 'qprobe-fake_port'201 self.client.assert_has_calls([mock.call.show_port('fake_port'),202 mock.call.show_network('fake_net'),203 mock.call.show_subnet('fake_subnet'),204 mock.call.delete_port('fake_port')])205 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),206 mock.call.unplug('tap12345678-12',207 namespace=namespace,208 bridge='br-ex')])209 def test_delete_probe_without_namespace(self):210 cfg.CONF.set_override('use_namespaces', False)211 cmd = commands.DeleteProbe(self.app, None)212 cmd_parser = cmd.get_parser('delete_probe')213 args = ['fake_port']214 parsed_args = cmd_parser.parse_args(args)215 cmd.run(parsed_args)216 self.client.assert_has_calls([mock.call.show_port('fake_port'),217 mock.call.show_network('fake_net'),218 mock.call.show_subnet('fake_subnet'),219 mock.call.delete_port('fake_port')])220 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),221 mock.call.unplug('tap12345678-12',222 bridge=None)])223 def test_list_probe(self):224 cmd = commands.ListProbe(self.app, None)225 cmd_parser = cmd.get_parser('list_probe')226 args = []227 parsed_args = cmd_parser.parse_args(args)228 cmd.run(parsed_args)229 self.client.assert_has_calls(230 [mock.call.list_ports(231 device_owner=[debug_agent.DEVICE_OWNER_NETWORK_PROBE,232 debug_agent.DEVICE_OWNER_COMPUTE_PROBE])])233 def test_exec_command(self):234 cmd = commands.ExecProbe(self.app, None)235 cmd_parser = cmd.get_parser('exec_command')236 args = ['fake_port', 'fake_command']237 parsed_args = cmd_parser.parse_args(args)238 with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand') as ns:239 cmd.run(parsed_args)240 ns.assert_has_calls([mock.call.execute(mock.ANY)])241 self.client.assert_has_calls([mock.call.show_port('fake_port')])242 def test_exec_command_without_namespace(self):243 cfg.CONF.set_override('use_namespaces', False)244 cmd = commands.ExecProbe(self.app, None)245 cmd_parser = cmd.get_parser('exec_command')246 args = ['fake_port', 'fake_command']247 parsed_args = cmd_parser.parse_args(args)248 with mock.patch('neutron.agent.linux.utils.execute') as exe:249 cmd.run(parsed_args)250 exe.assert_has_calls([mock.call.execute(mock.ANY)])251 self.client.assert_has_calls([mock.call.show_port('fake_port')])252 def test_clear_probe(self):253 cmd = commands.ClearProbe(self.app, None)254 cmd_parser = cmd.get_parser('clear_probe')255 args = []256 parsed_args = cmd_parser.parse_args(args)257 cmd.run(parsed_args)258 namespace = 'qprobe-fake_port'259 self.client.assert_has_calls(260 [mock.call.list_ports(261 device_id=socket.gethostname(),262 device_owner=[debug_agent.DEVICE_OWNER_NETWORK_PROBE,263 debug_agent.DEVICE_OWNER_COMPUTE_PROBE]),264 mock.call.show_port('fake_port'),265 mock.call.show_network('fake_net'),266 mock.call.show_subnet('fake_subnet'),267 mock.call.delete_port('fake_port')])268 self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),269 mock.call.unplug('tap12345678-12',270 namespace=namespace,271 bridge=None)])272 def test_ping_all_with_ensure_port(self):273 fake_ports = self.fake_ports274 def fake_port_list(network_id=None, device_owner=None, device_id=None):275 if network_id:276 # In order to test ensure_port, return []277 return {'ports': []}278 return fake_ports279 self.client.list_ports.side_effect = fake_port_list280 cmd = commands.PingAll(self.app, None)281 cmd_parser = cmd.get_parser('ping_all')282 args = []283 parsed_args = cmd_parser.parse_args(args)284 namespace = 'qprobe-fake_port'285 with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand') as ns:286 cmd.run(parsed_args)287 ns.assert_has_calls([mock.call.execute(mock.ANY)])288 fake_port = {'port':289 {'device_owner': debug_agent.DEVICE_OWNER_NETWORK_PROBE,290 'admin_state_up': True,291 'network_id': 'fake_net',292 'tenant_id': 'fake_tenant',293 'binding:host_id': cfg.CONF.host,294 'fixed_ips': [{'subnet_id': 'fake_subnet'}],295 'device_id': socket.gethostname()}}296 expected = [mock.call.show_network('fake_net'),297 mock.call.show_subnet('fake_subnet'),298 mock.call.create_port(fake_port),299 mock.call.show_subnet('fake_subnet')]300 self.client.assert_has_calls(expected)301 self.driver.assert_has_calls([mock.call.init_l3('tap12345678-12',302 ['10.0.0.3/24'],303 namespace=namespace304 )])305 def test_ping_all(self):306 cmd = commands.PingAll(self.app, None)307 cmd_parser = cmd.get_parser('ping_all')308 args = []309 parsed_args = cmd_parser.parse_args(args)310 with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand') as ns:311 cmd.run(parsed_args)312 ns.assert_has_calls([mock.call.execute(mock.ANY)])313 expected = [mock.call.list_ports(),314 mock.call.list_ports(315 network_id='fake_net',316 device_owner=debug_agent.DEVICE_OWNER_NETWORK_PROBE,317 device_id=socket.gethostname()),318 mock.call.show_subnet('fake_subnet'),319 mock.call.show_port('fake_port')]320 self.client.assert_has_calls(expected)321 def test_ping_all_v6(self):322 fake_subnet_v6 = {'subnet': {'name': 'fake_v6',323 'ip_version': 6}}324 self.client.show_subnet.return_value = fake_subnet_v6325 cmd = commands.PingAll(self.app, None)326 cmd_parser = cmd.get_parser('ping_all')327 args = []328 parsed_args = cmd_parser.parse_args(args)329 with mock.patch('neutron.agent.linux.ip_lib.IpNetnsCommand') as ns:330 cmd.run(parsed_args)331 ns.assert_has_calls([mock.call.execute(mock.ANY)])332 self.client.assert_has_calls([mock.call.list_ports()])

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful