How to use test_security_groups method in localstack

Best Python code snippet using localstack_python

test_neutron_security_groups.py

Source:test_neutron_security_groups.py Github

copy

Full Screen

1# Copyright 2013 Nicira, Inc.2# All Rights Reserved3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import six16import uuid17import mock18from neutronclient.common import exceptions as n_exc19from neutronclient.neutron import v2_0 as neutronv2020from oslo_config import cfg21from oslo_serialization import jsonutils22import webob23from nova.api.openstack.compute.legacy_v2.contrib import security_groups24from nova import compute25from nova import context26import nova.db27from nova import exception28from nova.network import model29from nova.network.neutronv2 import api as neutron_api30from nova.network.security_group import neutron_driver31from nova.objects import instance as instance_obj32from nova import test33from nova.tests.unit.api.openstack.compute import test_security_groups34from nova.tests.unit.api.openstack import fakes35class TestNeutronSecurityGroupsTestCase(test.TestCase):36 def setUp(self):37 super(TestNeutronSecurityGroupsTestCase, self).setUp()38 cfg.CONF.set_override('security_group_api', 'neutron')39 self.original_client = neutron_api.get_client40 neutron_api.get_client = get_client41 def tearDown(self):42 neutron_api.get_client = self.original_client43 get_client()._reset()44 super(TestNeutronSecurityGroupsTestCase, self).tearDown()45class TestNeutronSecurityGroupsV21(46 test_security_groups.TestSecurityGroupsV21,47 TestNeutronSecurityGroupsTestCase):48 def _create_sg_template(self, **kwargs):49 sg = test_security_groups.security_group_request_template(**kwargs)50 return self.controller.create(self.req, body={'security_group': sg})51 def _create_network(self):52 body = {'network': {'name': 'net1'}}53 neutron = get_client()54 net = neutron.create_network(body)55 body = {'subnet': {'network_id': net['network']['id'],56 'cidr': '10.0.0.0/24'}}57 neutron.create_subnet(body)58 return net59 def _create_port(self, **kwargs):60 body = {'port': {'binding:vnic_type': model.VNIC_TYPE_NORMAL}}61 fields = ['security_groups', 'device_id', 'network_id',62 'port_security_enabled']63 for field in fields:64 if field in kwargs:65 body['port'][field] = kwargs[field]66 neutron = get_client()67 return neutron.create_port(body)68 def _create_security_group(self, **kwargs):69 body = {'security_group': {}}70 fields = ['name', 'description']71 for field in fields:72 if field in kwargs:73 body['security_group'][field] = kwargs[field]74 neutron = get_client()75 return neutron.create_security_group(body)76 def test_create_security_group_with_no_description(self):77 # Neutron's security group description field is optional.78 pass79 def test_create_security_group_with_empty_description(self):80 # Neutron's security group description field is optional.81 pass82 def test_create_security_group_with_blank_name(self):83 # Neutron's security group name field is optional.84 pass85 def test_create_security_group_with_whitespace_name(self):86 # Neutron allows security group name to be whitespace.87 pass88 def test_create_security_group_with_blank_description(self):89 # Neutron's security group description field is optional.90 pass91 def test_create_security_group_with_whitespace_description(self):92 # Neutron allows description to be whitespace.93 pass94 def test_create_security_group_with_duplicate_name(self):95 # Neutron allows duplicate names for security groups.96 pass97 def test_create_security_group_non_string_name(self):98 # Neutron allows security group name to be non string.99 pass100 def test_create_security_group_non_string_description(self):101 # Neutron allows non string description.102 pass103 def test_create_security_group_quota_limit(self):104 # Enforced by Neutron server.105 pass106 def test_update_security_group(self):107 # Enforced by Neutron server.108 pass109 def test_get_security_group_list(self):110 self._create_sg_template().get('security_group')111 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')112 list_dict = self.controller.index(req)113 self.assertEqual(len(list_dict['security_groups']), 2)114 def test_get_security_group_list_all_tenants(self):115 pass116 def test_get_security_group_by_instance(self):117 sg = self._create_sg_template().get('security_group')118 net = self._create_network()119 self._create_port(120 network_id=net['network']['id'], security_groups=[sg['id']],121 device_id=test_security_groups.FAKE_UUID1)122 expected = [{'rules': [], 'tenant_id': 'fake', 'id': sg['id'],123 'name': 'test', 'description': 'test-description'}]124 self.stubs.Set(nova.db, 'instance_get_by_uuid',125 test_security_groups.return_server_by_uuid)126 req = fakes.HTTPRequest.blank('/v2/fake/servers/%s/os-security-groups'127 % test_security_groups.FAKE_UUID1)128 res_dict = self.server_controller.index(129 req, test_security_groups.FAKE_UUID1)['security_groups']130 self.assertEqual(expected, res_dict)131 def test_get_security_group_by_id(self):132 sg = self._create_sg_template().get('security_group')133 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'134 % sg['id'])135 res_dict = self.controller.show(req, sg['id'])136 expected = {'security_group': sg}137 self.assertEqual(res_dict, expected)138 def test_delete_security_group_by_id(self):139 sg = self._create_sg_template().get('security_group')140 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %141 sg['id'])142 self.controller.delete(req, sg['id'])143 def test_delete_security_group_by_admin(self):144 sg = self._create_sg_template().get('security_group')145 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s' %146 sg['id'], use_admin_context=True)147 self.controller.delete(req, sg['id'])148 @mock.patch('nova.compute.utils.refresh_info_cache_for_instance')149 def test_delete_security_group_in_use(self, refresh_info_cache_mock):150 sg = self._create_sg_template().get('security_group')151 self._create_network()152 db_inst = fakes.stub_instance(id=1, nw_cache=[], security_groups=[])153 _context = context.get_admin_context()154 instance = instance_obj.Instance._from_db_object(155 _context, instance_obj.Instance(), db_inst,156 expected_attrs=instance_obj.INSTANCE_DEFAULT_FIELDS)157 neutron = neutron_api.API()158 with mock.patch.object(nova.db, 'instance_get_by_uuid',159 return_value=db_inst):160 neutron.allocate_for_instance(_context, instance,161 security_groups=[sg['id']])162 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups/%s'163 % sg['id'])164 self.assertRaises(webob.exc.HTTPBadRequest, self.controller.delete,165 req, sg['id'])166 def test_associate_non_running_instance(self):167 # Neutron does not care if the instance is running or not. When the168 # instances is detected by neutron it will push down the security169 # group policy to it.170 pass171 def test_associate_already_associated_security_group_to_instance(self):172 # Neutron security groups does not raise an error if you update a173 # port adding a security group to it that was already associated174 # to the port. This is because PUT semantics are used.175 pass176 def test_associate(self):177 sg = self._create_sg_template().get('security_group')178 net = self._create_network()179 self._create_port(180 network_id=net['network']['id'], security_groups=[sg['id']],181 device_id=test_security_groups.FAKE_UUID1)182 self.stubs.Set(nova.db, 'instance_get',183 test_security_groups.return_server)184 body = dict(addSecurityGroup=dict(name="test"))185 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')186 self.manager._addSecurityGroup(req, '1', body)187 def test_associate_duplicate_names(self):188 sg1 = self._create_security_group(name='sg1',189 description='sg1')['security_group']190 self._create_security_group(name='sg1',191 description='sg1')['security_group']192 net = self._create_network()193 self._create_port(194 network_id=net['network']['id'], security_groups=[sg1['id']],195 device_id=test_security_groups.FAKE_UUID1)196 self.stubs.Set(nova.db, 'instance_get',197 test_security_groups.return_server)198 body = dict(addSecurityGroup=dict(name="sg1"))199 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')200 self.assertRaises(webob.exc.HTTPConflict,201 self.manager._addSecurityGroup, req, '1', body)202 def test_associate_port_security_enabled_true(self):203 sg = self._create_sg_template().get('security_group')204 net = self._create_network()205 self._create_port(206 network_id=net['network']['id'], security_groups=[sg['id']],207 port_security_enabled=True,208 device_id=test_security_groups.FAKE_UUID1)209 self.stubs.Set(nova.db, 'instance_get',210 test_security_groups.return_server)211 body = dict(addSecurityGroup=dict(name="test"))212 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')213 self.manager._addSecurityGroup(req, '1', body)214 def test_associate_port_security_enabled_false(self):215 self._create_sg_template().get('security_group')216 net = self._create_network()217 self._create_port(218 network_id=net['network']['id'], port_security_enabled=False,219 device_id=test_security_groups.FAKE_UUID1)220 self.stubs.Set(nova.db, 'instance_get',221 test_security_groups.return_server)222 body = dict(addSecurityGroup=dict(name="test"))223 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')224 self.assertRaises(webob.exc.HTTPBadRequest,225 self.manager._addSecurityGroup,226 req, '1', body)227 def test_disassociate_by_non_existing_security_group_name(self):228 self.stubs.Set(nova.db, 'instance_get',229 test_security_groups.return_server)230 body = dict(removeSecurityGroup=dict(name='non-existing'))231 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')232 self.assertRaises(webob.exc.HTTPNotFound,233 self.manager._removeSecurityGroup, req, '1', body)234 def test_disassociate_non_running_instance(self):235 # Neutron does not care if the instance is running or not. When the236 # instances is detected by neutron it will push down the security237 # group policy to it.238 pass239 def test_disassociate_already_associated_security_group_to_instance(self):240 # Neutron security groups does not raise an error if you update a241 # port adding a security group to it that was already associated242 # to the port. This is because PUT semantics are used.243 pass244 def test_disassociate(self):245 sg = self._create_sg_template().get('security_group')246 net = self._create_network()247 self._create_port(248 network_id=net['network']['id'], security_groups=[sg['id']],249 device_id=test_security_groups.FAKE_UUID1)250 self.stubs.Set(nova.db, 'instance_get',251 test_security_groups.return_server)252 body = dict(removeSecurityGroup=dict(name="test"))253 req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')254 self.manager._removeSecurityGroup(req, '1', body)255 def test_get_raises_no_unique_match_error(self):256 def fake_find_resourceid_by_name_or_id(client, param, name,257 project_id=None):258 raise n_exc.NeutronClientNoUniqueMatch()259 self.stubs.Set(neutronv20, 'find_resourceid_by_name_or_id',260 fake_find_resourceid_by_name_or_id)261 security_group_api = self.controller.security_group_api262 self.assertRaises(exception.NoUniqueMatch, security_group_api.get,263 context.get_admin_context(), 'foobar')264 def test_get_instances_security_groups_bindings(self):265 servers = [{'id': test_security_groups.FAKE_UUID1},266 {'id': test_security_groups.FAKE_UUID2}]267 sg1 = self._create_sg_template(name='test1').get('security_group')268 sg2 = self._create_sg_template(name='test2').get('security_group')269 # test name='' is replaced with id270 sg3 = self._create_sg_template(name='').get('security_group')271 net = self._create_network()272 self._create_port(273 network_id=net['network']['id'], security_groups=[sg1['id'],274 sg2['id']],275 device_id=test_security_groups.FAKE_UUID1)276 self._create_port(277 network_id=net['network']['id'], security_groups=[sg2['id'],278 sg3['id']],279 device_id=test_security_groups.FAKE_UUID2)280 expected = {test_security_groups.FAKE_UUID1: [{'name': sg1['name']},281 {'name': sg2['name']}],282 test_security_groups.FAKE_UUID2: [{'name': sg2['name']},283 {'name': sg3['id']}]}284 security_group_api = self.controller.security_group_api285 bindings = (286 security_group_api.get_instances_security_groups_bindings(287 context.get_admin_context(), servers))288 self.assertEqual(bindings, expected)289 def test_get_instance_security_groups(self):290 sg1 = self._create_sg_template(name='test1').get('security_group')291 sg2 = self._create_sg_template(name='test2').get('security_group')292 # test name='' is replaced with id293 sg3 = self._create_sg_template(name='').get('security_group')294 net = self._create_network()295 self._create_port(296 network_id=net['network']['id'], security_groups=[sg1['id'],297 sg2['id'],298 sg3['id']],299 device_id=test_security_groups.FAKE_UUID1)300 expected = [{'name': sg1['name']}, {'name': sg2['name']},301 {'name': sg3['id']}]302 security_group_api = self.controller.security_group_api303 sgs = security_group_api.get_instance_security_groups(304 context.get_admin_context(), test_security_groups.FAKE_UUID1)305 self.assertEqual(sgs, expected)306 @mock.patch('nova.network.security_group.neutron_driver.SecurityGroupAPI.'307 'get_instances_security_groups_bindings')308 def test_get_security_group_empty_for_instance(self, neutron_sg_bind_mock):309 servers = [{'id': test_security_groups.FAKE_UUID1}]310 neutron_sg_bind_mock.return_value = {}311 security_group_api = self.controller.security_group_api312 ctx = context.get_admin_context()313 sgs = security_group_api.get_instance_security_groups(ctx,314 test_security_groups.FAKE_UUID1)315 neutron_sg_bind_mock.assert_called_once_with(ctx, servers, False)316 self.assertEqual([], sgs)317 def test_create_port_with_sg_and_port_security_enabled_true(self):318 sg1 = self._create_sg_template(name='test1').get('security_group')319 net = self._create_network()320 self._create_port(321 network_id=net['network']['id'], security_groups=[sg1['id']],322 port_security_enabled=True,323 device_id=test_security_groups.FAKE_UUID1)324 security_group_api = self.controller.security_group_api325 sgs = security_group_api.get_instance_security_groups(326 context.get_admin_context(), test_security_groups.FAKE_UUID1)327 self.assertEqual(sgs, [{'name': 'test1'}])328 def test_create_port_with_sg_and_port_security_enabled_false(self):329 sg1 = self._create_sg_template(name='test1').get('security_group')330 net = self._create_network()331 self.assertRaises(exception.SecurityGroupCannotBeApplied,332 self._create_port,333 network_id=net['network']['id'],334 security_groups=[sg1['id']],335 port_security_enabled=False,336 device_id=test_security_groups.FAKE_UUID1)337class TestNeutronSecurityGroupsV2(TestNeutronSecurityGroupsV21):338 secgrp_ctl_cls = security_groups.SecurityGroupController339 server_secgrp_ctl_cls = security_groups.ServerSecurityGroupController340 secgrp_act_ctl_cls = security_groups.SecurityGroupActionController341class TestNeutronSecurityGroupRulesTestCase(TestNeutronSecurityGroupsTestCase):342 def setUp(self):343 super(TestNeutronSecurityGroupRulesTestCase, self).setUp()344 id1 = '11111111-1111-1111-1111-111111111111'345 sg_template1 = test_security_groups.security_group_template(346 security_group_rules=[], id=id1)347 id2 = '22222222-2222-2222-2222-222222222222'348 sg_template2 = test_security_groups.security_group_template(349 security_group_rules=[], id=id2)350 self.controller_sg = security_groups.SecurityGroupController()351 neutron = get_client()352 neutron._fake_security_groups[id1] = sg_template1353 neutron._fake_security_groups[id2] = sg_template2354 def tearDown(self):355 neutron_api.get_client = self.original_client356 get_client()._reset()357 super(TestNeutronSecurityGroupsTestCase, self).tearDown()358class _TestNeutronSecurityGroupRulesBase(object):359 def test_create_add_existing_rules_by_cidr(self):360 sg = test_security_groups.security_group_template()361 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')362 self.controller_sg.create(req, {'security_group': sg})363 rule = test_security_groups.security_group_rule_template(364 cidr='15.0.0.0/8', parent_group_id=self.sg2['id'])365 req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')366 self.controller.create(req, {'security_group_rule': rule})367 self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,368 req, {'security_group_rule': rule})369 def test_create_add_existing_rules_by_group_id(self):370 sg = test_security_groups.security_group_template()371 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')372 self.controller_sg.create(req, {'security_group': sg})373 rule = test_security_groups.security_group_rule_template(374 group=self.sg1['id'], parent_group_id=self.sg2['id'])375 req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')376 self.controller.create(req, {'security_group_rule': rule})377 self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,378 req, {'security_group_rule': rule})379 def test_delete(self):380 rule = test_security_groups.security_group_rule_template(381 parent_group_id=self.sg2['id'])382 req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')383 res_dict = self.controller.create(req, {'security_group_rule': rule})384 security_group_rule = res_dict['security_group_rule']385 req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules/%s'386 % security_group_rule['id'])387 self.controller.delete(req, security_group_rule['id'])388 def test_create_rule_quota_limit(self):389 # Enforced by neutron390 pass391class TestNeutronSecurityGroupRulesV2(392 _TestNeutronSecurityGroupRulesBase,393 test_security_groups.TestSecurityGroupRulesV2,394 TestNeutronSecurityGroupRulesTestCase):395 pass396class TestNeutronSecurityGroupRulesV21(397 _TestNeutronSecurityGroupRulesBase,398 test_security_groups.TestSecurityGroupRulesV21,399 TestNeutronSecurityGroupRulesTestCase):400 pass401class TestNeutronSecurityGroupsOutputTest(TestNeutronSecurityGroupsTestCase):402 content_type = 'application/json'403 def setUp(self):404 super(TestNeutronSecurityGroupsOutputTest, self).setUp()405 fakes.stub_out_nw_api(self.stubs)406 self.controller = security_groups.SecurityGroupController()407 self.stubs.Set(compute.api.API, 'get',408 test_security_groups.fake_compute_get)409 self.stubs.Set(compute.api.API, 'get_all',410 test_security_groups.fake_compute_get_all)411 self.stubs.Set(compute.api.API, 'create',412 test_security_groups.fake_compute_create)413 self.stubs.Set(neutron_driver.SecurityGroupAPI,414 'get_instances_security_groups_bindings',415 (test_security_groups.416 fake_get_instances_security_groups_bindings))417 self.flags(418 osapi_compute_extension=[419 'nova.api.openstack.compute.contrib.select_extensions'],420 osapi_compute_ext_list=['Security_groups'])421 def _make_request(self, url, body=None):422 req = webob.Request.blank(url)423 if body:424 req.method = 'POST'425 req.body = self._encode_body(body)426 req.content_type = self.content_type427 req.headers['Accept'] = self.content_type428 res = req.get_response(fakes.wsgi_app(init_only=('servers',)))429 return res430 def _encode_body(self, body):431 return jsonutils.dumps(body)432 def _get_server(self, body):433 return jsonutils.loads(body).get('server')434 def _get_servers(self, body):435 return jsonutils.loads(body).get('servers')436 def _get_groups(self, server):437 return server.get('security_groups')438 def test_create(self):439 url = '/v2/fake/servers'440 image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'441 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')442 security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]443 for security_group in security_groups:444 sg = test_security_groups.security_group_template(445 name=security_group['name'])446 self.controller.create(req, {'security_group': sg})447 server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,448 security_groups=security_groups)449 res = self._make_request(url, {'server': server})450 self.assertEqual(res.status_int, 202)451 server = self._get_server(res.body)452 for i, group in enumerate(self._get_groups(server)):453 name = 'fake-2-%s' % i454 self.assertEqual(group.get('name'), name)455 def test_create_server_get_default_security_group(self):456 url = '/v2/fake/servers'457 image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'458 server = dict(name='server_test', imageRef=image_uuid, flavorRef=2)459 res = self._make_request(url, {'server': server})460 self.assertEqual(res.status_int, 202)461 server = self._get_server(res.body)462 group = self._get_groups(server)[0]463 self.assertEqual(group.get('name'), 'default')464 def test_show(self):465 def fake_get_instance_security_groups(inst, context, id):466 return [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]467 self.stubs.Set(neutron_driver.SecurityGroupAPI,468 'get_instance_security_groups',469 fake_get_instance_security_groups)470 url = '/v2/fake/servers'471 image_uuid = 'c905cedb-7281-47e4-8a62-f26bc5fc4c77'472 req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')473 security_groups = [{'name': 'fake-2-0'}, {'name': 'fake-2-1'}]474 for security_group in security_groups:475 sg = test_security_groups.security_group_template(476 name=security_group['name'])477 self.controller.create(req, {'security_group': sg})478 server = dict(name='server_test', imageRef=image_uuid, flavorRef=2,479 security_groups=security_groups)480 res = self._make_request(url, {'server': server})481 self.assertEqual(res.status_int, 202)482 server = self._get_server(res.body)483 for i, group in enumerate(self._get_groups(server)):484 name = 'fake-2-%s' % i485 self.assertEqual(group.get('name'), name)486 # Test that show (GET) returns the same information as create (POST)487 url = '/v2/fake/servers/' + test_security_groups.UUID3488 res = self._make_request(url)489 self.assertEqual(res.status_int, 200)490 server = self._get_server(res.body)491 for i, group in enumerate(self._get_groups(server)):492 name = 'fake-2-%s' % i493 self.assertEqual(group.get('name'), name)494 def test_detail(self):495 url = '/v2/fake/servers/detail'496 res = self._make_request(url)497 self.assertEqual(res.status_int, 200)498 for i, server in enumerate(self._get_servers(res.body)):499 for j, group in enumerate(self._get_groups(server)):500 name = 'fake-%s-%s' % (i, j)501 self.assertEqual(group.get('name'), name)502 def test_no_instance_passthrough_404(self):503 def fake_compute_get(*args, **kwargs):504 raise exception.InstanceNotFound(instance_id='fake')505 self.stubs.Set(compute.api.API, 'get', fake_compute_get)506 url = '/v2/fake/servers/70f6db34-de8d-4fbd-aafb-4065bdfa6115'507 res = self._make_request(url)508 self.assertEqual(res.status_int, 404)509def get_client(context=None, admin=False):510 return MockClient()511class MockClient(object):512 # Needs to be global to survive multiple calls to get_client.513 _fake_security_groups = {}514 _fake_ports = {}515 _fake_networks = {}516 _fake_subnets = {}517 _fake_security_group_rules = {}518 def __init__(self):519 # add default security group520 if not len(self._fake_security_groups):521 ret = {'name': 'default', 'description': 'default',522 'tenant_id': 'fake_tenant', 'security_group_rules': [],523 'id': str(uuid.uuid4())}524 self._fake_security_groups[ret['id']] = ret525 def _reset(self):526 self._fake_security_groups.clear()527 self._fake_ports.clear()528 self._fake_networks.clear()529 self._fake_subnets.clear()530 self._fake_security_group_rules.clear()531 def create_security_group(self, body=None):532 s = body.get('security_group')533 if not isinstance(s.get('name', ''), six.string_types):534 msg = ('BadRequest: Invalid input for name. Reason: '535 'None is not a valid string.')536 raise n_exc.BadRequest(message=msg)537 if not isinstance(s.get('description.', ''), six.string_types):538 msg = ('BadRequest: Invalid input for description. Reason: '539 'None is not a valid string.')540 raise n_exc.BadRequest(message=msg)541 if len(s.get('name')) > 255 or len(s.get('description')) > 255:542 msg = 'Security Group name great than 255'543 raise n_exc.NeutronClientException(message=msg, status_code=401)544 ret = {'name': s.get('name'), 'description': s.get('description'),545 'tenant_id': 'fake', 'security_group_rules': [],546 'id': str(uuid.uuid4())}547 self._fake_security_groups[ret['id']] = ret548 return {'security_group': ret}549 def create_network(self, body):550 n = body.get('network')551 ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'),552 'admin_state_up': n.get('admin_state_up', True),553 'tenant_id': 'fake_tenant',554 'id': str(uuid.uuid4())}555 if 'port_security_enabled' in n:556 ret['port_security_enabled'] = n['port_security_enabled']557 self._fake_networks[ret['id']] = ret558 return {'network': ret}559 def create_subnet(self, body):560 s = body.get('subnet')561 try:562 net = self._fake_networks[s.get('network_id')]563 except KeyError:564 msg = 'Network %s not found' % s.get('network_id')565 raise n_exc.NeutronClientException(message=msg, status_code=404)566 ret = {'name': s.get('name'), 'network_id': s.get('network_id'),567 'tenant_id': 'fake_tenant', 'cidr': s.get('cidr'),568 'id': str(uuid.uuid4()), 'gateway_ip': '10.0.0.1'}569 net['subnets'].append(ret['id'])570 self._fake_networks[net['id']] = net571 self._fake_subnets[ret['id']] = ret572 return {'subnet': ret}573 def create_port(self, body):574 p = body.get('port')575 ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()),576 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'),577 'device_id': p.get('device_id', str(uuid.uuid4())),578 'admin_state_up': p.get('admin_state_up', True),579 'security_groups': p.get('security_groups', []),580 'network_id': p.get('network_id'),581 'binding:vnic_type':582 p.get('binding:vnic_type') or model.VNIC_TYPE_NORMAL}583 network = self._fake_networks[p['network_id']]584 if 'port_security_enabled' in p:585 ret['port_security_enabled'] = p['port_security_enabled']586 elif 'port_security_enabled' in network:587 ret['port_security_enabled'] = network['port_security_enabled']588 port_security = ret.get('port_security_enabled', True)589 # port_security must be True if security groups are present590 if not port_security and ret['security_groups']:591 raise exception.SecurityGroupCannotBeApplied()592 if network['subnets']:593 ret['fixed_ips'] = [{'subnet_id': network['subnets'][0],594 'ip_address': '10.0.0.1'}]595 if not ret['security_groups'] and (port_security is None or596 port_security is True):597 for security_group in self._fake_security_groups.values():598 if security_group['name'] == 'default':599 ret['security_groups'] = [security_group['id']]600 break601 self._fake_ports[ret['id']] = ret602 return {'port': ret}603 def create_security_group_rule(self, body):604 # does not handle bulk case so just picks rule[0]605 r = body.get('security_group_rules')[0]606 fields = ['direction', 'protocol', 'port_range_min', 'port_range_max',607 'ethertype', 'remote_ip_prefix', 'tenant_id',608 'security_group_id', 'remote_group_id']609 ret = {}610 for field in fields:611 ret[field] = r.get(field)612 ret['id'] = str(uuid.uuid4())613 self._fake_security_group_rules[ret['id']] = ret614 return {'security_group_rules': [ret]}615 def show_security_group(self, security_group, **_params):616 try:617 sg = self._fake_security_groups[security_group]618 except KeyError:619 msg = 'Security Group %s not found' % security_group620 raise n_exc.NeutronClientException(message=msg, status_code=404)621 for security_group_rule in self._fake_security_group_rules.values():622 if security_group_rule['security_group_id'] == sg['id']:623 sg['security_group_rules'].append(security_group_rule)624 return {'security_group': sg}625 def show_security_group_rule(self, security_group_rule, **_params):626 try:627 return {'security_group_rule':628 self._fake_security_group_rules[security_group_rule]}629 except KeyError:630 msg = 'Security Group rule %s not found' % security_group_rule631 raise n_exc.NeutronClientException(message=msg, status_code=404)632 def show_network(self, network, **_params):633 try:634 return {'network':635 self._fake_networks[network]}636 except KeyError:637 msg = 'Network %s not found' % network638 raise n_exc.NeutronClientException(message=msg, status_code=404)639 def show_port(self, port, **_params):640 try:641 return {'port':642 self._fake_ports[port]}643 except KeyError:644 msg = 'Port %s not found' % port645 raise n_exc.NeutronClientException(message=msg, status_code=404)646 def show_subnet(self, subnet, **_params):647 try:648 return {'subnet':649 self._fake_subnets[subnet]}650 except KeyError:651 msg = 'Port %s not found' % subnet652 raise n_exc.NeutronClientException(message=msg, status_code=404)653 def list_security_groups(self, **_params):654 ret = []655 for security_group in self._fake_security_groups.values():656 names = _params.get('name')657 if names:658 if not isinstance(names, list):659 names = [names]660 for name in names:661 if security_group.get('name') == name:662 ret.append(security_group)663 ids = _params.get('id')664 if ids:665 if not isinstance(ids, list):666 ids = [ids]667 for id in ids:668 if security_group.get('id') == id:669 ret.append(security_group)670 elif not (names or ids):671 ret.append(security_group)672 return {'security_groups': ret}673 def list_networks(self, **_params):674 # neutronv2/api.py _get_available_networks calls this assuming675 # search_opts filter "shared" is implemented and not ignored676 shared = _params.get("shared", None)677 if shared:678 return {'networks': []}679 else:680 return {'networks':681 [network for network in self._fake_networks.values()]}682 def list_ports(self, **_params):683 ret = []684 device_id = _params.get('device_id')685 for port in self._fake_ports.values():686 if device_id:687 if port['device_id'] in device_id:688 ret.append(port)689 else:690 ret.append(port)691 return {'ports': ret}692 def list_subnets(self, **_params):693 return {'subnets':694 [subnet for subnet in self._fake_subnets.values()]}695 def list_floatingips(self, **_params):696 return {'floatingips': []}697 def delete_security_group(self, security_group):698 self.show_security_group(security_group)699 ports = self.list_ports()700 for port in ports.get('ports'):701 for sg_port in port['security_groups']:702 if sg_port == security_group:703 msg = ('Unable to delete Security group %s in use'704 % security_group)705 raise n_exc.NeutronClientException(message=msg,706 status_code=409)707 del self._fake_security_groups[security_group]708 def delete_security_group_rule(self, security_group_rule):709 self.show_security_group_rule(security_group_rule)710 del self._fake_security_group_rules[security_group_rule]711 def delete_network(self, network):712 self.show_network(network)713 self._check_ports_on_network(network)714 for subnet in self._fake_subnets.values():715 if subnet['network_id'] == network:716 del self._fake_subnets[subnet['id']]717 del self._fake_networks[network]718 def delete_port(self, port):719 self.show_port(port)720 del self._fake_ports[port]721 def update_port(self, port, body=None):722 self.show_port(port)723 self._fake_ports[port].update(body['port'])724 return {'port': self._fake_ports[port]}725 def list_extensions(self, **_parms):726 return {'extensions': []}727 def _check_ports_on_network(self, network):728 ports = self.list_ports()729 for port in ports:730 if port['network_id'] == network:731 msg = ('Unable to complete operation on network %s. There is '732 'one or more ports still in use on the network'733 % network)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful