Best Python code snippet using localstack_python
test_neutron_security_groups.py
Source:test_neutron_security_groups.py  
1# Copyright 2013 Nicira, Inc.2# All Rights Reserved3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15import six16import uuid17import mock18from neutronclient.common import exceptions as n_exc19from neutronclient.neutron import v2_0 as neutronv2020from oslo_config import cfg21from oslo_serialization import jsonutils22import webob23from nova.api.openstack.compute.legacy_v2.contrib import security_groups24from nova import compute25from nova import context26import nova.db27from nova import exception28from nova.network import model29from nova.network.neutronv2 import api as neutron_api30from nova.network.security_group import neutron_driver31from nova.objects import instance as instance_obj32from nova import test33from nova.tests.unit.api.openstack.compute import test_security_groups34from nova.tests.unit.api.openstack import fakes35class TestNeutronSecurityGroupsTestCase(test.TestCase):36    def setUp(self):37        super(TestNeutronSecurityGroupsTestCase, self).setUp()38        cfg.CONF.set_override('security_group_api', 'neutron')39        self.original_client = neutron_api.get_client40        neutron_api.get_client = get_client41    def tearDown(self):42        neutron_api.get_client = self.original_client43        get_client()._reset()44        super(TestNeutronSecurityGroupsTestCase, self).tearDown()45class TestNeutronSecurityGroupsV21(46        test_security_groups.TestSecurityGroupsV21,47        TestNeutronSecurityGroupsTestCase):48    def _create_sg_template(self, **kwargs):49        sg = test_security_groups.security_group_request_template(**kwargs)50        return self.controller.create(self.req, body={'security_group': sg})51    def _create_network(self):52        body = {'network': {'name': 'net1'}}53        neutron = get_client()54        net = neutron.create_network(body)55        body = {'subnet': {'network_id': net['network']['id'],56                           'cidr': '10.0.0.0/24'}}57        neutron.create_subnet(body)58        return net59    def _create_port(self, **kwargs):60        body = {'port': {'binding:vnic_type': model.VNIC_TYPE_NORMAL}}61        fields = ['security_groups', 'device_id', 'network_id',62                  'port_security_enabled']63        for field in fields:64            if field in kwargs:65                body['port'][field] = kwargs[field]66        neutron = get_client()67        return neutron.create_port(body)68    def _create_security_group(self, **kwargs):69        body = {'security_group': {}}70        fields = ['name', 'description']71        for field in fields:72            if field in kwargs:73                body['security_group'][field] = kwargs[field]74        neutron = get_client()75        return neutron.create_security_group(body)76    def test_create_security_group_with_no_description(self):77        # Neutron's security group description field is optional.78        pass79    def test_create_security_group_with_empty_description(self):80        # Neutron's security group description field is optional.81        pass82    def test_create_security_group_with_blank_name(self):83        # Neutron's security group name field is optional.84        pass85    def test_create_security_group_with_whitespace_name(self):86        # Neutron allows security group name to be whitespace.87        pass88    def test_create_security_group_with_blank_description(self):89        # Neutron's security group description field is optional.90        pass91    def test_create_security_group_with_whitespace_description(self):92        # Neutron allows description to be whitespace.93        pass94    def test_create_security_group_with_duplicate_name(self):95        # Neutron allows duplicate names for security groups.96        pass97    def test_create_security_group_non_string_name(self):98        # Neutron allows security group name to be non string.99        pass100    def test_create_security_group_non_string_description(self):101        # Neutron allows non string description.102        pass103    def test_create_security_group_quota_limit(self):104        # Enforced by Neutron server.105        pass106    def test_update_security_group(self):107        # Enforced by Neutron server.108        pass109    def test_get_security_group_list(self):110        self._create_sg_template().get('security_group')111        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')112        list_dict = self.controller.index(req)113        self.assertEqual(len(list_dict['security_groups']), 2)114    def test_get_security_group_list_all_tenants(self):115        pass116    def test_get_security_group_by_instance(self):117        sg = self._create_sg_template().get('security_group')118        net = self._create_network()119        self._create_port(120            network_id=net['network']['id'], security_groups=[sg['id']],121            device_id=test_security_groups.FAKE_UUID1)122        expected = [{'rules': [], 'tenant_id': 'fake', 'id': sg['id'],123                    'name': 'test', 'description': 'test-description'}]124        self.stubs.Set(nova.db, 'instance_get_by_uuid',125                       test_security_groups.return_server_by_uuid)126        req = fakes.HTTPRequest.blank('/v2/fake/servers/%s/os-security-groups'127                                      % test_security_groups.FAKE_UUID1)128        res_dict = self.server_controller.index(129            req, test_security_groups.FAKE_UUID1)['security_groups']130        self.assertEqual(expected, res_dict)131    def test_get_security_group_by_id(self):132        sg = self._create_sg_template().get('security_group')133        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'134                                      % sg['id'])135        res_dict = self.controller.show(req, sg['id'])136        expected = {'security_group': sg}137        self.assertEqual(res_dict, expected)138    def test_delete_security_group_by_id(self):139        sg = self._create_sg_template().get('security_group')140        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %141                                      sg['id'])142        self.controller.delete(req, sg['id'])143    def test_delete_security_group_by_admin(self):144        sg = self._create_sg_template().get('security_group')145        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %146                                      sg['id'], use_admin_context=True)147        self.controller.delete(req, sg['id'])148    @mock.patch('nova.compute.utils.refresh_info_cache_for_instance')149    def test_delete_security_group_in_use(self, refresh_info_cache_mock):150        sg = self._create_sg_template().get('security_group')151        self._create_network()152        db_inst = fakes.stub_instance(id=1, nw_cache=[], security_groups=[])153        _context = context.get_admin_context()154        instance = instance_obj.Instance._from_db_object(155            _context, instance_obj.Instance(), db_inst,156            expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS)157        neutron = neutron_api.API()158        with mock.patch.object(nova.db, 'instance_get_by_uuid',159                               return_value=db_inst):160            neutron.allocate_for_instance(_context, instance,161                                          security_groups=[sg['id']])162        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'163                                      % sg['id'])164        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,165                          req, sg['id'])166    def test_associate_non_running_instance(self):167        # Neutron does not care if the instance is running or not. When the168        # instances is detected by neutron it will push down the security169        # group policy to it.170        pass171    def test_associate_already_associated_security_group_to_instance(self):172        # Neutron security groups does not raise an error if you update a173        # port adding a security group to it that was already associated174        # to the port. This is because PUT semantics are used.175        pass176    def test_associate(self):177        sg = self._create_sg_template().get('security_group')178        net = self._create_network()179        self._create_port(180            network_id=net['network']['id'], security_groups=[sg['id']],181            device_id=test_security_groups.FAKE_UUID1)182        self.stubs.Set(nova.db, 'instance_get',183                       test_security_groups.return_server)184        body = dict(addSecurityGroup=dict(name="test"))185        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')186        self.manager._addSecurityGroup(req, '1', body)187    def test_associate_duplicate_names(self):188        sg1 = self._create_security_group(name='sg1',189                                          description='sg1')['security_group']190        self._create_security_group(name='sg1',191                                    description='sg1')['security_group']192        net = self._create_network()193        self._create_port(194            network_id=net['network']['id'], security_groups=[sg1['id']],195            device_id=test_security_groups.FAKE_UUID1)196        self.stubs.Set(nova.db, 'instance_get',197                       test_security_groups.return_server)198        body = dict(addSecurityGroup=dict(name="sg1"))199        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')200        self.assertRaises(webob.exc.HTTPConflict,201                          self.manager._addSecurityGroup, req, '1', body)202    def test_associate_port_security_enabled_true(self):203        sg = self._create_sg_template().get('security_group')204        net = self._create_network()205        self._create_port(206            network_id=net['network']['id'], security_groups=[sg['id']],207            port_security_enabled=True,208            device_id=test_security_groups.FAKE_UUID1)209        self.stubs.Set(nova.db, 'instance_get',210                       test_security_groups.return_server)211        body = dict(addSecurityGroup=dict(name="test"))212        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')213        self.manager._addSecurityGroup(req, '1', body)214    def test_associate_port_security_enabled_false(self):215        self._create_sg_template().get('security_group')216        net = self._create_network()217        self._create_port(218            network_id=net['network']['id'], port_security_enabled=False,219            device_id=test_security_groups.FAKE_UUID1)220        self.stubs.Set(nova.db, 'instance_get',221                       test_security_groups.return_server)222        body = dict(addSecurityGroup=dict(name="test"))223        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')224        self.assertRaises(webob.exc.HTTPBadRequest,225                          self.manager._addSecurityGroup,226                          req, '1', body)227    def test_disassociate_by_non_existing_security_group_name(self):228        self.stubs.Set(nova.db, 'instance_get',229                       test_security_groups.return_server)230        body = dict(removeSecurityGroup=dict(name='non-existing'))231        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')232        self.assertRaises(webob.exc.HTTPNotFound,233                          self.manager._removeSecurityGroup, req, '1', body)234    def test_disassociate_non_running_instance(self):235        # Neutron does not care if the instance is running or not. When the236        # instances is detected by neutron it will push down the security237        # group policy to it.238        pass239    def test_disassociate_already_associated_security_group_to_instance(self):240        # Neutron security groups does not raise an error if you update a241        # port adding a security group to it that was already associated242        # to the port. This is because PUT semantics are used.243        pass244    def test_disassociate(self):245        sg = self._create_sg_template().get('security_group')246        net = self._create_network()247        self._create_port(248            network_id=net['network']['id'], security_groups=[sg['id']],249            device_id=test_security_groups.FAKE_UUID1)250        self.stubs.Set(nova.db, 'instance_get',251                       test_security_groups.return_server)252        body = dict(removeSecurityGroup=dict(name="test"))253        req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')254        self.manager._removeSecurityGroup(req, '1', body)255    def test_get_raises_no_unique_match_error(self):256        def fake_find_resourceid_by_name_or_id(client, param, name,257                                               project_id=None):258            raise n_exc.NeutronClientNoUniqueMatch()259        self.stubs.Set(neutronv20, 'find_resourceid_by_name_or_id',260                       fake_find_resourceid_by_name_or_id)261        security_group_api = self.controller.security_group_api262        self.assertRaises(exception.NoUniqueMatch, security_group_api.get,263                          context.get_admin_context(), 'foobar')264    def test_get_instances_security_groups_bindings(self):265        servers = [{'id': test_security_groups.FAKE_UUID1},266                   {'id': test_security_groups.FAKE_UUID2}]267        sg1 = self._create_sg_template(name='test1').get('security_group')268        sg2 = self._create_sg_template(name='test2').get('security_group')269        # test name='' is replaced with id270        sg3 = self._create_sg_template(name='').get('security_group')271        net = self._create_network()272        self._create_port(273            network_id=net['network']['id'], security_groups=[sg1['id'],274                                                              sg2['id']],275            device_id=test_security_groups.FAKE_UUID1)276        self._create_port(277            network_id=net['network']['id'], security_groups=[sg2['id'],278                                                              sg3['id']],279            device_id=test_security_groups.FAKE_UUID2)280        expected = {test_security_groups.FAKE_UUID1: [{'name': sg1['name']},281                                                      {'name': sg2['name']}],282                    test_security_groups.FAKE_UUID2: [{'name': sg2['name']},283                                                      {'name': sg3['id']}]}284        security_group_api = self.controller.security_group_api285        bindings = (286            security_group_api.get_instances_security_groups_bindings(287                context.get_admin_context(), servers))288        self.assertEqual(bindings, expected)289    def test_get_instance_security_groups(self):290        sg1 = self._create_sg_template(name='test1').get('security_group')291        sg2 = self._create_sg_template(name='test2').get('security_group')292        # test name='' is replaced with id293        sg3 = self._create_sg_template(name='').get('security_group')294        net = self._create_network()295        self._create_port(296            network_id=net['network']['id'], security_groups=[sg1['id'],297                                                              sg2['id'],298                                                              sg3['id']],299            device_id=test_security_groups.FAKE_UUID1)300        expected = [{'name': sg1['name']}, {'name': sg2['name']},301                    {'name': sg3['id']}]302        security_group_api = self.controller.security_group_api303        sgs = security_group_api.get_instance_security_groups(304            context.get_admin_context(), test_security_groups.FAKE_UUID1)305        self.assertEqual(sgs, expected)306    @mock.patch('nova.network.security_group.neutron_driver.SecurityGroupAPI.'307                'get_instances_security_groups_bindings')308    def test_get_security_group_empty_for_instance(self, neutron_sg_bind_mock):309        servers = [{'id': test_security_groups.FAKE_UUID1}]310        neutron_sg_bind_mock.return_value = {}311        security_group_api = self.controller.security_group_api312        ctx = context.get_admin_context()313        sgs = security_group_api.get_instance_security_groups(ctx,314                test_security_groups.FAKE_UUID1)315        neutron_sg_bind_mock.assert_called_once_with(ctx, servers, False)316        self.assertEqual([], sgs)317    def test_create_port_with_sg_and_port_security_enabled_true(self):318        sg1 = self._create_sg_template(name='test1').get('security_group')319        net = self._create_network()320        self._create_port(321            network_id=net['network']['id'], security_groups=[sg1['id']],322            port_security_enabled=True,323            device_id=test_security_groups.FAKE_UUID1)324        security_group_api = self.controller.security_group_api325        sgs = security_group_api.get_instance_security_groups(326            context.get_admin_context(), test_security_groups.FAKE_UUID1)327        self.assertEqual(sgs, [{'name': 'test1'}])328    def test_create_port_with_sg_and_port_security_enabled_false(self):329        sg1 = self._create_sg_template(name='test1').get('security_group')330        net = self._create_network()331        self.assertRaises(exception.SecurityGroupCannotBeApplied,332                          self._create_port,333                           network_id=net['network']['id'],334                           security_groups=[sg1['id']],335                           port_security_enabled=False,336                           device_id=test_security_groups.FAKE_UUID1)337class TestNeutronSecurityGroupsV2(TestNeutronSecurityGroupsV21):338    secgrp_ctl_cls = security_groups.SecurityGroupController339    server_secgrp_ctl_cls = security_groups.ServerSecurityGroupController340    secgrp_act_ctl_cls = security_groups.SecurityGroupActionController341class TestNeutronSecurityGroupRulesTestCase(TestNeutronSecurityGroupsTestCase):342    def setUp(self):343        super(TestNeutronSecurityGroupRulesTestCase, self).setUp()344        id1 = '11111111-1111-1111-1111-111111111111'345        sg_template1 = test_security_groups.security_group_template(346            security_group_rules=[], id=id1)347        id2 = '22222222-2222-2222-2222-222222222222'348        sg_template2 = test_security_groups.security_group_template(349            security_group_rules=[], id=id2)350        self.controller_sg = security_groups.SecurityGroupController()351        neutron = get_client()352        neutron._fake_security_groups[id1] = sg_template1353        neutron._fake_security_groups[id2] = sg_template2354    def tearDown(self):355        neutron_api.get_client = self.original_client356        get_client()._reset()357        super(TestNeutronSecurityGroupsTestCase, self).tearDown()358class _TestNeutronSecurityGroupRulesBase(object):359    def test_create_add_existing_rules_by_cidr(self):360        sg = test_security_groups.security_group_template()361        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')362        self.controller_sg.create(req, {'security_group': sg})363        rule = test_security_groups.security_group_rule_template(364            cidr='15.0.0.0/8', parent_group_id=self.sg2['id'])365        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')366        self.controller.create(req, {'security_group_rule': rule})367        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,368                          req, {'security_group_rule': rule})369    def test_create_add_existing_rules_by_group_id(self):370        sg = test_security_groups.security_group_template()371        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')372        self.controller_sg.create(req, {'security_group': sg})373        rule = test_security_groups.security_group_rule_template(374            group=self.sg1['id'], parent_group_id=self.sg2['id'])375        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')376        self.controller.create(req, {'security_group_rule': rule})377        self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,378                          req, {'security_group_rule': rule})379    def test_delete(self):380        rule = test_security_groups.security_group_rule_template(381            parent_group_id=self.sg2['id'])382        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')383        res_dict = self.controller.create(req, {'security_group_rule': rule})384        security_group_rule = res_dict['security_group_rule']385        req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'386                                      % security_group_rule['id'])387        self.controller.delete(req, security_group_rule['id'])388    def test_create_rule_quota_limit(self):389        # Enforced by neutron390        pass391class TestNeutronSecurityGroupRulesV2(392        _TestNeutronSecurityGroupRulesBase,393        test_security_groups.TestSecurityGroupRulesV2,394        TestNeutronSecurityGroupRulesTestCase):395    pass396class TestNeutronSecurityGroupRulesV21(397        _TestNeutronSecurityGroupRulesBase,398        test_security_groups.TestSecurityGroupRulesV21,399        TestNeutronSecurityGroupRulesTestCase):400    pass401class TestNeutronSecurityGroupsOutputTest(TestNeutronSecurityGroupsTestCase):402    content_type = 'application/json'403    def setUp(self):404        super(TestNeutronSecurityGroupsOutputTest, self).setUp()405        fakes.stub_out_nw_api(self.stubs)406        self.controller = security_groups.SecurityGroupController()407        self.stubs.Set(compute.api.API, 'get',408                       test_security_groups.fake_compute_get)409        self.stubs.Set(compute.api.API, 'get_all',410                       test_security_groups.fake_compute_get_all)411        self.stubs.Set(compute.api.API, 'create',412                       test_security_groups.fake_compute_create)413        self.stubs.Set(neutron_driver.SecurityGroupAPI,414                       'get_instances_security_groups_bindings',415                       (test_security_groups.416                       fake_get_instances_security_groups_bindings))417        self.flags(418            osapi_compute_extension=[419                'nova.api.openstack.compute.contrib.select_extensions'],420            osapi_compute_ext_list=['Security_groups'])421    def _make_request(self, url, body=None):422        req = webob.Request.blank(url)423        if body:424            req.method = 'POST'425            req.body = self._encode_body(body)426        req.content_type = self.content_type427        req.headers['Accept'] = self.content_type428        res = req.get_response(fakes.wsgi_app(init_only=('servers',)))429        return res430    def _encode_body(self, body):431        return jsonutils.dumps(body)432    def _get_server(self, body):433        return jsonutils.loads(body).get('server')434    def _get_servers(self, body):435        return jsonutils.loads(body).get('servers')436    def _get_groups(self, server):437        return server.get('security_groups')438    def test_create(self):439        url = '/v2/fake/servers'440        image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'441        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')442        security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]443        for security_group in security_groups:444            sg = test_security_groups.security_group_template(445                name=security_group['name'])446            self.controller.create(req, {'security_group': sg})447        server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,448                      security_groups=security_groups)449        res = self._make_request(url, {'server': server})450        self.assertEqual(res.status_int, 202)451        server = self._get_server(res.body)452        for i, group in enumerate(self._get_groups(server)):453            name = 'fake-2-%s' % i454            self.assertEqual(group.get('name'), name)455    def test_create_server_get_default_security_group(self):456        url = '/v2/fake/servers'457        image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'458        server = dict(name='server_test', imageRef=image_uuid, flavorRef=2)459        res = self._make_request(url, {'server': server})460        self.assertEqual(res.status_int, 202)461        server = self._get_server(res.body)462        group = self._get_groups(server)[0]463        self.assertEqual(group.get('name'), 'default')464    def test_show(self):465        def fake_get_instance_security_groups(inst, context, id):466            return [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]467        self.stubs.Set(neutron_driver.SecurityGroupAPI,468                       'get_instance_security_groups',469                       fake_get_instance_security_groups)470        url = '/v2/fake/servers'471        image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'472        req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')473        security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]474        for security_group in security_groups:475            sg = test_security_groups.security_group_template(476                name=security_group['name'])477            self.controller.create(req, {'security_group': sg})478        server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,479                      security_groups=security_groups)480        res = self._make_request(url, {'server': server})481        self.assertEqual(res.status_int, 202)482        server = self._get_server(res.body)483        for i, group in enumerate(self._get_groups(server)):484            name = 'fake-2-%s' % i485            self.assertEqual(group.get('name'), name)486        # Test that show (GET) returns the same information as create (POST)487        url = '/v2/fake/servers/' + test_security_groups.UUID3488        res = self._make_request(url)489        self.assertEqual(res.status_int, 200)490        server = self._get_server(res.body)491        for i, group in enumerate(self._get_groups(server)):492            name = 'fake-2-%s' % i493            self.assertEqual(group.get('name'), name)494    def test_detail(self):495        url = '/v2/fake/servers/detail'496        res = self._make_request(url)497        self.assertEqual(res.status_int, 200)498        for i, server in enumerate(self._get_servers(res.body)):499            for j, group in enumerate(self._get_groups(server)):500                name = 'fake-%s-%s' % (i, j)501                self.assertEqual(group.get('name'), name)502    def test_no_instance_passthrough_404(self):503        def fake_compute_get(*args, **kwargs):504            raise exception.InstanceNotFound(instance_id='fake')505        self.stubs.Set(compute.api.API, 'get', fake_compute_get)506        url = '/v2/fake/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'507        res = self._make_request(url)508        self.assertEqual(res.status_int, 404)509def get_client(context=None, admin=False):510    return MockClient()511class MockClient(object):512    # Needs to be global to survive multiple calls to get_client.513    _fake_security_groups = {}514    _fake_ports = {}515    _fake_networks = {}516    _fake_subnets = {}517    _fake_security_group_rules = {}518    def __init__(self):519        # add default security group520        if not len(self._fake_security_groups):521            ret = {'name': 'default', 'description': 'default',522                   'tenant_id': 'fake_tenant', 'security_group_rules': [],523                   'id': str(uuid.uuid4())}524            self._fake_security_groups[ret['id']] = ret525    def _reset(self):526        self._fake_security_groups.clear()527        self._fake_ports.clear()528        self._fake_networks.clear()529        self._fake_subnets.clear()530        self._fake_security_group_rules.clear()531    def create_security_group(self, body=None):532        s = body.get('security_group')533        if not isinstance(s.get('name', ''), six.string_types):534            msg = ('BadRequest: Invalid input for name. Reason: '535                   'None is not a valid string.')536            raise n_exc.BadRequest(message=msg)537        if not isinstance(s.get('description.', ''), six.string_types):538            msg = ('BadRequest: Invalid input for description. Reason: '539                   'None is not a valid string.')540            raise n_exc.BadRequest(message=msg)541        if len(s.get('name')) > 255 or len(s.get('description')) > 255:542            msg = 'Security Group name great than 255'543            raise n_exc.NeutronClientException(message=msg, status_code=401)544        ret = {'name': s.get('name'), 'description': s.get('description'),545               'tenant_id': 'fake', 'security_group_rules': [],546               'id': str(uuid.uuid4())}547        self._fake_security_groups[ret['id']] = ret548        return {'security_group': ret}549    def create_network(self, body):550        n = body.get('network')551        ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'),552               'admin_state_up': n.get('admin_state_up', True),553               'tenant_id': 'fake_tenant',554               'id': str(uuid.uuid4())}555        if 'port_security_enabled' in n:556            ret['port_security_enabled'] = n['port_security_enabled']557        self._fake_networks[ret['id']] = ret558        return {'network': ret}559    def create_subnet(self, body):560        s = body.get('subnet')561        try:562            net = self._fake_networks[s.get('network_id')]563        except KeyError:564            msg = 'Network %s not found' % s.get('network_id')565            raise n_exc.NeutronClientException(message=msg, status_code=404)566        ret = {'name': s.get('name'), 'network_id': s.get('network_id'),567               'tenant_id': 'fake_tenant', 'cidr': s.get('cidr'),568               'id': str(uuid.uuid4()), 'gateway_ip': '10.0.0.1'}569        net['subnets'].append(ret['id'])570        self._fake_networks[net['id']] = net571        self._fake_subnets[ret['id']] = ret572        return {'subnet': ret}573    def create_port(self, body):574        p = body.get('port')575        ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()),576               'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'),577               'device_id': p.get('device_id', str(uuid.uuid4())),578               'admin_state_up': p.get('admin_state_up', True),579               'security_groups': p.get('security_groups', []),580               'network_id': p.get('network_id'),581               'binding:vnic_type':582                   p.get('binding:vnic_type') or model.VNIC_TYPE_NORMAL}583        network = self._fake_networks[p['network_id']]584        if 'port_security_enabled' in p:585            ret['port_security_enabled'] = p['port_security_enabled']586        elif 'port_security_enabled' in network:587            ret['port_security_enabled'] = network['port_security_enabled']588        port_security = ret.get('port_security_enabled', True)589        # port_security must be True if security groups are present590        if not port_security and ret['security_groups']:591            raise exception.SecurityGroupCannotBeApplied()592        if network['subnets']:593            ret['fixed_ips'] = [{'subnet_id': network['subnets'][0],594                                 'ip_address': '10.0.0.1'}]595        if not ret['security_groups'] and (port_security is None or596                                           port_security is True):597            for security_group in self._fake_security_groups.values():598                if security_group['name'] == 'default':599                    ret['security_groups'] = [security_group['id']]600                    break601        self._fake_ports[ret['id']] = ret602        return {'port': ret}603    def create_security_group_rule(self, body):604        # does not handle bulk case so just picks rule[0]605        r = body.get('security_group_rules')[0]606        fields = ['direction', 'protocol', 'port_range_min', 'port_range_max',607                  'ethertype', 'remote_ip_prefix', 'tenant_id',608                  'security_group_id', 'remote_group_id']609        ret = {}610        for field in fields:611            ret[field] = r.get(field)612        ret['id'] = str(uuid.uuid4())613        self._fake_security_group_rules[ret['id']] = ret614        return {'security_group_rules': [ret]}615    def show_security_group(self, security_group, **_params):616        try:617            sg = self._fake_security_groups[security_group]618        except KeyError:619            msg = 'Security Group %s not found' % security_group620            raise n_exc.NeutronClientException(message=msg, status_code=404)621        for security_group_rule in self._fake_security_group_rules.values():622            if security_group_rule['security_group_id'] == sg['id']:623                sg['security_group_rules'].append(security_group_rule)624        return {'security_group': sg}625    def show_security_group_rule(self, security_group_rule, **_params):626        try:627            return {'security_group_rule':628                    self._fake_security_group_rules[security_group_rule]}629        except KeyError:630            msg = 'Security Group rule %s not found' % security_group_rule631            raise n_exc.NeutronClientException(message=msg, status_code=404)632    def show_network(self, network, **_params):633        try:634            return {'network':635                    self._fake_networks[network]}636        except KeyError:637            msg = 'Network %s not found' % network638            raise n_exc.NeutronClientException(message=msg, status_code=404)639    def show_port(self, port, **_params):640        try:641            return {'port':642                    self._fake_ports[port]}643        except KeyError:644            msg = 'Port %s not found' % port645            raise n_exc.NeutronClientException(message=msg, status_code=404)646    def show_subnet(self, subnet, **_params):647        try:648            return {'subnet':649                    self._fake_subnets[subnet]}650        except KeyError:651            msg = 'Port %s not found' % subnet652            raise n_exc.NeutronClientException(message=msg, status_code=404)653    def list_security_groups(self, **_params):654        ret = []655        for security_group in self._fake_security_groups.values():656            names = _params.get('name')657            if names:658                if not isinstance(names, list):659                    names = [names]660                for name in names:661                    if security_group.get('name') == name:662                        ret.append(security_group)663            ids = _params.get('id')664            if ids:665                if not isinstance(ids, list):666                    ids = [ids]667                for id in ids:668                    if security_group.get('id') == id:669                        ret.append(security_group)670            elif not (names or ids):671                ret.append(security_group)672        return {'security_groups': ret}673    def list_networks(self, **_params):674        # neutronv2/api.py _get_available_networks calls this assuming675        # search_opts filter "shared" is implemented and not ignored676        shared = _params.get("shared", None)677        if shared:678            return {'networks': []}679        else:680            return {'networks':681                 [network for network in self._fake_networks.values()]}682    def list_ports(self, **_params):683        ret = []684        device_id = _params.get('device_id')685        for port in self._fake_ports.values():686            if device_id:687                if port['device_id'] in device_id:688                    ret.append(port)689            else:690                ret.append(port)691        return {'ports': ret}692    def list_subnets(self, **_params):693        return {'subnets':694                [subnet for subnet in self._fake_subnets.values()]}695    def list_floatingips(self, **_params):696        return {'floatingips': []}697    def delete_security_group(self, security_group):698        self.show_security_group(security_group)699        ports = self.list_ports()700        for port in ports.get('ports'):701            for sg_port in port['security_groups']:702                if sg_port == security_group:703                    msg = ('Unable to delete Security group %s in use'704                           % security_group)705                    raise n_exc.NeutronClientException(message=msg,706                                                       status_code=409)707        del self._fake_security_groups[security_group]708    def delete_security_group_rule(self, security_group_rule):709        self.show_security_group_rule(security_group_rule)710        del self._fake_security_group_rules[security_group_rule]711    def delete_network(self, network):712        self.show_network(network)713        self._check_ports_on_network(network)714        for subnet in self._fake_subnets.values():715            if subnet['network_id'] == network:716                del self._fake_subnets[subnet['id']]717        del self._fake_networks[network]718    def delete_port(self, port):719        self.show_port(port)720        del self._fake_ports[port]721    def update_port(self, port, body=None):722        self.show_port(port)723        self._fake_ports[port].update(body['port'])724        return {'port': self._fake_ports[port]}725    def list_extensions(self, **_parms):726        return {'extensions': []}727    def _check_ports_on_network(self, network):728        ports = self.list_ports()729        for port in ports:730            if port['network_id'] == network:731                msg = ('Unable to complete operation on network %s. There is '732                       'one or more ports still in use on the network'733                       % network)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
