How to use _create_policy method in localstack

Best Python code snippet using localstack_python

test_policy.py

Source:test_policy.py Github

copy

Full Screen

...55 DummyPolicy)56 environment.global_env().register_policy('senlin.policy.dummy-1.1',57 DummyPolicy)58 self.spec = parser.simple_parse(sample_policy)59 def _create_policy(self, policy_name, policy_id=None):60 policy = pb.Policy(policy_name, self.spec,61 user=self.ctx.user_id,62 project=self.ctx.project_id,63 domain=self.ctx.domain_id)64 if policy_id:65 policy.id = policy_id66 return policy67 def _create_db_policy(self, **kwargs):68 values = {69 'name': 'test-policy',70 'type': 'senlin.policy.dummy-1.0',71 'spec': self.spec,72 'created_at': timeutils.utcnow(True),73 'user': self.ctx.user_id,74 'project': self.ctx.project_id,75 'domain': self.ctx.domain_id,76 }77 values.update(kwargs)78 return po.Policy.create(self.ctx, values)79 def test_init(self):80 policy = self._create_policy('test-policy')81 self.assertIsNone(policy.id)82 self.assertEqual('test-policy', policy.name)83 self.assertEqual(self.spec, policy.spec)84 self.assertEqual('senlin.policy.dummy-1.0', policy.type)85 self.assertEqual(self.ctx.user_id, policy.user)86 self.assertEqual(self.ctx.project_id, policy.project)87 self.assertEqual(self.ctx.domain_id, policy.domain)88 self.assertEqual({}, policy.data)89 self.assertIsNone(policy.created_at)90 self.assertIsNone(policy.updated_at)91 self.assertTrue(policy.singleton)92 spec_data = policy.spec_data93 self.assertEqual('senlin.policy.dummy', spec_data['type'])94 self.assertEqual('1.0', spec_data['version'])95 self.assertEqual({'key1': 'value1', 'key2': 2},96 spec_data['properties'])97 self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)98 def test_init_version_as_float(self):99 self.spec['version'] = 1.1100 policy = self._create_policy('test-policy')101 self.assertIsNone(policy.id)102 self.assertEqual('test-policy', policy.name)103 self.assertEqual(self.spec, policy.spec)104 self.assertEqual('senlin.policy.dummy-1.1', policy.type)105 self.assertEqual(self.ctx.user_id, policy.user)106 self.assertEqual(self.ctx.project_id, policy.project)107 self.assertEqual(self.ctx.domain_id, policy.domain)108 self.assertEqual({}, policy.data)109 self.assertIsNone(policy.created_at)110 self.assertIsNone(policy.updated_at)111 self.assertTrue(policy.singleton)112 spec_data = policy.spec_data113 self.assertEqual('senlin.policy.dummy', spec_data['type'])114 self.assertEqual('1.1', spec_data['version'])115 self.assertEqual({'key1': 'value1', 'key2': 2},116 spec_data['properties'])117 self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)118 def test_init_version_as_string(self):119 self.spec['version'] = '1.1'120 policy = self._create_policy('test-policy')121 self.assertIsNone(policy.id)122 self.assertEqual('test-policy', policy.name)123 self.assertEqual(self.spec, policy.spec)124 self.assertEqual('senlin.policy.dummy-1.1', policy.type)125 self.assertEqual(self.ctx.user_id, policy.user)126 self.assertEqual(self.ctx.project_id, policy.project)127 self.assertEqual(self.ctx.domain_id, policy.domain)128 self.assertEqual({}, policy.data)129 self.assertIsNone(policy.created_at)130 self.assertIsNone(policy.updated_at)131 self.assertTrue(policy.singleton)132 spec_data = policy.spec_data133 self.assertEqual('senlin.policy.dummy', spec_data['type'])134 self.assertEqual('1.1', spec_data['version'])135 self.assertEqual({'key1': 'value1', 'key2': 2},136 spec_data['properties'])137 self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)138 def test_policy_new_type_not_found(self):139 bad_spec = {140 'type': 'bad-type',141 'version': '1.0',142 'properties': '',143 }144 self.assertRaises(exception.ResourceNotFound,145 pb.Policy,146 'test-policy', bad_spec)147 def test_load(self):148 policy = utils.create_policy(self.ctx, UUID1)149 result = pb.Policy.load(self.ctx, policy.id)150 self.assertEqual(policy.id, result.id)151 self.assertEqual(policy.name, result.name)152 self.assertEqual(policy.type, result.type)153 self.assertEqual(policy.user, result.user)154 self.assertEqual(policy.project, result.project)155 self.assertEqual(policy.domain, result.domain)156 self.assertEqual(policy.spec, result.spec)157 self.assertEqual(policy.data, result.data)158 self.assertEqual({'key1': 'value1', 'key2': 2}, result.properties)159 self.assertEqual(policy.created_at, result.created_at)160 self.assertEqual(policy.updated_at, result.updated_at)161 def test_load_with_policy(self):162 policy = utils.create_policy(self.ctx, UUID1)163 expected = pb.Policy.load(self.ctx, policy.id)164 res = pb.Policy.load(self.ctx, db_policy=policy)165 self.assertIsNotNone(res)166 self.assertEqual(expected.id, res.id)167 def test_load_diff_project(self):168 policy = utils.create_policy(self.ctx, UUID1)169 new_ctx = utils.dummy_context(project='a-different-project')170 self.assertRaises(exception.ResourceNotFound,171 pb.Policy.load,172 new_ctx, policy.id, None)173 res = pb.Policy.load(new_ctx, policy.id, project_safe=False)174 self.assertIsNotNone(res)175 self.assertEqual(policy.id, res.id)176 def test_load_not_found(self):177 ex = self.assertRaises(exception.ResourceNotFound,178 pb.Policy.load,179 self.ctx, 'fake-policy', None)180 self.assertEqual("The policy 'fake-policy' could not be found.",181 str(ex))182 ex = self.assertRaises(exception.ResourceNotFound,183 pb.Policy.load,184 self.ctx, None, None)185 self.assertEqual("The policy 'None' could not be found.",186 str(ex))187 def test_delete(self):188 policy = utils.create_policy(self.ctx, UUID1)189 policy_id = policy.id190 res = pb.Policy.delete(self.ctx, policy_id)191 self.assertIsNone(res)192 self.assertRaises(exception.ResourceNotFound,193 pb.Policy.load,194 self.ctx, policy_id, None)195 def test_delete_not_found(self):196 result = pb.Policy.delete(self.ctx, 'bogus')197 self.assertIsNone(result)198 def test_store_for_create(self):199 policy = self._create_policy('test-policy')200 self.assertIsNone(policy.id)201 policy_id = policy.store(self.ctx)202 self.assertIsNotNone(policy_id)203 self.assertEqual(policy_id, policy.id)204 result = po.Policy.get(self.ctx, policy_id)205 self.assertIsNotNone(result)206 self.assertEqual('test-policy', result.name)207 self.assertEqual(policy_id, result.id)208 self.assertEqual(policy.type, result.type)209 self.assertEqual(policy.user, result.user)210 self.assertEqual(policy.project, result.project)211 self.assertEqual(policy.domain, result.domain)212 self.assertEqual(policy.spec, result.spec)213 self.assertEqual(policy.data, result.data)214 self.assertIsNotNone(result.created_at)215 self.assertIsNone(result.updated_at)216 def test_store_for_update(self):217 policy = self._create_policy('test-policy')218 self.assertIsNone(policy.id)219 policy_id = policy.store(self.ctx)220 self.assertIsNotNone(policy_id)221 self.assertEqual(policy_id, policy.id)222 # do an update223 policy.name = 'test-policy-1'224 policy.data = {'kk': 'vv'}225 new_id = policy.store(self.ctx)226 self.assertEqual(policy_id, new_id)227 result = po.Policy.get(self.ctx, policy_id)228 self.assertIsNotNone(result)229 self.assertEqual('test-policy-1', result.name)230 self.assertEqual({'kk': 'vv'}, policy.data)231 self.assertIsNotNone(policy.created_at)232 self.assertIsNotNone(policy.updated_at)233 def test_to_dict(self):234 policy = self._create_policy('test-policy')235 policy_id = policy.store(self.ctx)236 self.assertIsNotNone(policy_id)237 expected = {238 'id': policy_id,239 'name': policy.name,240 'type': policy.type,241 'user': policy.user,242 'project': policy.project,243 'domain': policy.domain,244 'spec': policy.spec,245 'data': policy.data,246 'created_at': common_utils.isotime(policy.created_at),247 'updated_at': None,248 }249 result = pb.Policy.load(self.ctx, policy_id=policy.id)250 self.assertEqual(expected, result.to_dict())251 def test_get_schema(self):252 expected = {253 'key1': {254 'default': 'value1',255 'description': 'first key',256 'required': False,257 'updatable': False,258 'type': 'String'259 },260 'key2': {261 'description': 'second key',262 'required': True,263 'updatable': False,264 'type': 'Integer'265 },266 }267 res = DummyPolicy.get_schema()268 self.assertEqual(expected, res)269 def test_build_policy_data(self):270 policy = self._create_policy('test-policy')271 data = {'key1': 'value1'}272 res = policy._build_policy_data(data)273 expect_result = {274 'DummyPolicy': {275 'version': '1.0',276 'data': data277 }278 }279 self.assertEqual(expect_result, res)280 def test_extract_policy_data(self):281 policy = self._create_policy('test-policy')282 # Extract data correctly283 data = {'key1': 'value1'}284 policy_data = {285 'DummyPolicy': {286 'version': '1.0',287 'data': data288 }289 }290 res = policy._extract_policy_data(policy_data)291 self.assertEqual(data, res)292 # Policy class name unmatch293 data = {'key1': 'value1'}294 policy_data = {295 'FakePolicy': {296 'version': '1.0',297 'data': data298 }299 }300 res = policy._extract_policy_data(policy_data)301 self.assertIsNone(res)302 # Policy version don't match303 data = {'key1': 'value1'}304 policy_data = {305 'DummyPolicy': {306 'version': '2.0',307 'data': data308 }309 }310 res = policy._extract_policy_data(policy_data)311 self.assertIsNone(res)312 @mock.patch.object(pb.Policy, '_build_conn_params')313 @mock.patch('senlin.drivers.base.SenlinDriver')314 def test_keystone(self, mock_sd, mock_params):315 policy = self._create_policy('test-policy')316 fake_params = mock.Mock()317 mock_params.return_value = fake_params318 kc = mock.Mock()319 driver = mock.Mock()320 driver.identity.return_value = kc321 mock_sd.return_value = driver322 res = policy.keystone('user1', 'project1')323 self.assertEqual(kc, res)324 self.assertEqual(kc, policy._keystoneclient)325 mock_params.assert_called_once_with('user1', 'project1')326 mock_sd.assert_called_once_with()327 driver.identity.assert_called_once_with(fake_params)328 def test_keystone_already_initialized(self):329 policy = self._create_policy('test-policy')330 x_keystone = mock.Mock()331 policy._keystoneclient = x_keystone332 result = policy.keystone('foo', 'bar')333 self.assertEqual(x_keystone, result)334 @mock.patch.object(pb.Policy, '_build_conn_params')335 @mock.patch("senlin.drivers.base.SenlinDriver")336 def test_nova(self, mock_driver, mock_params):337 policy = self._create_policy('test-policy')338 fake_params = mock.Mock()339 mock_params.return_value = fake_params340 x_driver = mock.Mock()341 mock_driver.return_value = x_driver342 result = policy.nova('user1', 'project1')343 x_nova = x_driver.compute.return_value344 self.assertEqual(x_nova, result)345 self.assertEqual(x_nova, policy._novaclient)346 mock_params.assert_called_once_with('user1', 'project1')347 x_driver.compute.assert_called_once_with(fake_params)348 def test_nova_already_initialized(self):349 policy = self._create_policy('test-policy')350 x_nova = mock.Mock()351 policy._novaclient = x_nova352 result = policy.nova('foo', 'bar')353 self.assertEqual(x_nova, result)354 @mock.patch.object(pb.Policy, '_build_conn_params')355 @mock.patch("senlin.drivers.base.SenlinDriver")356 def test_network(self, mock_driver, mock_params):357 policy = self._create_policy('test-policy')358 fake_params = mock.Mock()359 mock_params.return_value = fake_params360 x_driver = mock.Mock()361 mock_driver.return_value = x_driver362 result = policy.network('user1', 'project1')363 x_network = x_driver.network.return_value364 self.assertEqual(x_network, result)365 self.assertEqual(x_network, policy._networkclient)366 mock_params.assert_called_once_with('user1', 'project1')367 x_driver.network.assert_called_once_with(fake_params)368 def test_network_already_initialized(self):369 policy = self._create_policy('test-policy')370 x_network = mock.Mock()371 policy._networkclient = x_network372 result = policy.network('foo', 'bar')373 self.assertEqual(x_network, result)374 @mock.patch.object(pb.Policy, '_build_conn_params')375 @mock.patch("senlin.drivers.base.SenlinDriver")376 def test_lbaas(self, mock_driver, mock_params):377 policy = self._create_policy('test-policy')378 fake_params = mock.Mock()379 mock_params.return_value = fake_params380 x_driver = mock.Mock()381 mock_driver.return_value = x_driver382 result = policy.lbaas('user1', 'project1')383 x_lbaas = x_driver.loadbalancing.return_value384 self.assertEqual(x_lbaas, result)385 self.assertEqual(x_lbaas, policy._lbaasclient)386 mock_params.assert_called_once_with('user1', 'project1')387 x_driver.loadbalancing.assert_called_once_with(fake_params)388 def test_lbaas_already_initialized(self):389 policy = self._create_policy('test-policy')390 x_lbaas = mock.Mock()391 policy._lbaasclient = x_lbaas392 result = policy.lbaas('foo', 'bar')393 self.assertEqual(x_lbaas, result)394 def test_default_need_check(self):395 action = mock.Mock()396 action.action = consts.CLUSTER_SCALE_IN397 action.data = {}398 policy = self._create_policy('test-policy')399 res = policy.need_check('BEFORE', action)400 self.assertTrue(res)401 setattr(policy, 'TARGET', [('BEFORE', consts.CLUSTER_SCALE_IN)])402 res = policy.need_check('BEFORE', action)403 self.assertTrue(res)404 res = policy.need_check('AFTER', action)405 self.assertFalse(res)406 def test_default_pre_op(self):407 policy = self._create_policy('test-policy')408 res = policy.pre_op('CLUSTER_ID', 'FOO')409 self.assertIsNone(res)410 def test_default_post_op(self):411 policy = self._create_policy('test-policy')412 res = policy.post_op('CLUSTER_ID', 'FOO')413 self.assertIsNone(res)414 def test_default_attach(self):415 cluster = mock.Mock()416 policy = self._create_policy('test-policy')417 # Policy targets on ANY profile types418 policy.PROFILE_TYPE = ['ANY']419 res = policy.attach(cluster)420 self.assertEqual((True, None), res)421 # Profile type of cluster is not in policy's target scope422 profile = mock.Mock()423 profile.type = 'os.nova.server'424 cluster.rt = {'profile': profile}425 policy.PROFILE_TYPE = ['os.heat.resource']426 msg = 'Policy not applicable on profile type: os.nova.server'427 res = policy.attach(cluster)428 self.assertEqual((False, msg), res)429 # Attaching succeed430 policy.PROFILE_TYPE = ['os.nova.server', 'os.heat.resource']431 res = policy.attach(cluster)432 self.assertEqual((True, None), res)433 def test_default_detach(self):434 cluster = mock.Mock()435 policy = self._create_policy('test-policy')436 res = policy.detach(cluster)437 self.assertEqual((True, None), res)438 @mock.patch.object(co.Credential, 'get')439 @mock.patch.object(senlin_ctx, 'get_service_credentials')440 @mock.patch.object(oslo_ctx, 'get_current')441 def test_build_conn_params(self, mock_get_current, mock_get_service_creds,442 mock_cred_get):443 service_cred = {444 'auth_url': 'AUTH_URL',445 'username': 'senlin',446 'user_domain_name': 'default',447 'password': '123',448 'project_domain_name': 'Domain',449 'verify': True,450 'interface': 'Public',451 }452 current_ctx = {453 'auth_url': 'auth_url',454 'user_name': 'user1',455 'user_domain_name': 'default',456 'password': '456'457 }458 cred_info = {459 'openstack': {460 'trust': 'TRUST_ID',461 }462 }463 cred = mock.Mock(cred=cred_info)464 mock_get_service_creds.return_value = service_cred465 mock_get_current.return_value = current_ctx466 mock_cred_get.return_value = cred467 policy = self._create_policy('test-policy')468 res = policy._build_conn_params('user1', 'project1')469 expected_result = {470 'auth_url': 'AUTH_URL',471 'username': 'senlin',472 'user_domain_name': 'default',473 'password': '123',474 'trust_id': 'TRUST_ID',475 'project_domain_name': 'Domain',476 'verify': True,477 'interface': 'Public',478 }479 self.assertEqual(expected_result, res)480 mock_get_service_creds.assert_called_once_with()481 mock_cred_get.assert_called_once_with(current_ctx, 'user1', 'project1')482 @mock.patch.object(co.Credential, 'get')483 @mock.patch.object(senlin_ctx, 'get_service_credentials')484 @mock.patch.object(oslo_ctx, 'get_current')485 def test_build_conn_params_trust_not_found(486 self, mock_get_current, mock_get_service_creds, mock_cred_get):487 service_cred = {488 'auth_url': 'AUTH_URL',489 'username': 'senlin',490 'user_domain_name': 'default',491 'password': '123'492 }493 mock_get_service_creds.return_value = service_cred494 mock_cred_get.return_value = None495 policy = self._create_policy('test-policy')496 ex = self.assertRaises(exception.TrustNotFound,497 policy._build_conn_params,498 'user1', 'project1')499 msg = "The trust for trustor 'user1' could not be found."...

Full Screen

Full Screen

test_PolicyService.py

Source:test_PolicyService.py Github

copy

Full Screen

...37 service._policy_dir = self.current_dir + "/out"38 # when39 service.add_read_policy(secret_path)40 # then41 self.assertEquals([self._create_policy(secret_path)], list(service._policies))42 def test_remove_read_policy(self):43 # given44 secret_path = "develop/myTeam/myService/someSecret"45 service = PolicyService(self.vault_config)46 service._policy_dir = self.current_dir + "/out"47 # when48 service.remove_read_policy(secret_path)49 # then50 self.assertEquals([], list(service._policies))51 def test_persist(self):52 # given53 target_dir = self.current_dir + "/out"54 service = PolicyService(self.vault_config)55 service._policy_dir = target_dir56 service._mesos_framework = "marathon"57 service._mesos_group = "myGroup"58 service._service = "myService"59 service._team = "myTeam"60 service._policies = set()61 service._policies.add(self._create_policy("ci/myTeam/myService/mongo.password"))62 service._policies.add(self._create_policy("develop/myTeam/myService/mongo.password"))63 service._policies.add(self._create_policy("live/myTeam/myService/mongo.password"))64 service._policies.add(self._create_policy("live/myTeam/myService/jdbc.password"))65 # when66 service.persist()67 # then68 files = [f for f in os.listdir(target_dir)]69 assert_that(files, contains_inanyorder("marathon.myGroup.live_myService.myTeam.live.hcl",70 "marathon.myGroup.aliasForNonLive_myService.myTeam.ci.hcl",71 "marathon.myGroup.aliasForNonLive_myService.myTeam.develop.hcl"))72 @data(("ci/myTeam/myService/mongo.password", "ci/myTeam/myService/mongo.password"),73 ("/ci/myTeam/myService/mongo.password", "ci/myTeam/myService/mongo.password"))74 @unpack75 def test_normalize_path(self, path, expected_normalized_path):76 # given77 service = PolicyService(self.vault_config)78 # when79 normalized_path = service._normalize_path(path)80 # then81 self.assertEquals(expected_normalized_path, normalized_path)82 def expected_policies(self):83 return [self._create_policy("live/myTeam/myService/mongo.password"),84 self._create_policy("live/myTeam/myService/jdbc.password"),85 self._create_policy("ci/myTeam/myService/mongo.password"),86 self._create_policy("ci/myTeam/myService/jdbc.password"),87 self._create_policy("develop/myTeam/myService/mongo.password"),88 self._create_policy("develop/myTeam/myService/jdbc.password")89 ]90 @staticmethod91 def _create_policy(secret_path):...

Full Screen

Full Screen

test_network_policy.py

Source:test_network_policy.py Github

copy

Full Screen

...24CONF = config.CONF25LOG = logging.getLogger(__name__)26class NetworkPolicyTest(rbac_base.BaseContrailTest):27 """Test class to test network policy objects using RBAC roles"""28 def _create_policy(self):29 fq_name = data_utils.rand_name('network-policy')30 post_body = {31 'parent_type': 'project',32 'fq_name': ["default-domain", self.tenant_name, fq_name]33 }34 resp_body = self.network_policy_client.create_network_policys(35 **post_body)36 network_policy_uuid = resp_body['network-policy']['uuid']37 self.addCleanup(self._try_delete_resource,38 self.network_policy_client.delete_network_policy,39 network_policy_uuid)40 return network_policy_uuid41 def _update_policy(self, network_policy_uuid):42 put_body = {43 'display_name': data_utils.rand_name('network-policy')44 }45 self.network_policy_client.update_network_policy(network_policy_uuid,46 **put_body)47 @rbac_rule_validation.action(service="Contrail",48 rules=["list_network_policys"])49 @decorators.idempotent_id('fa2a28f3-a8bb-4908-95b9-1e11cf58b16f')50 def test_list_policys(self):51 """test method for list n/w policy objects"""52 with self.override_role():53 self.network_policy_client.list_network_policys()54 @rbac_rule_validation.action(service="Contrail",55 rules=["create_network_policys"])56 @decorators.idempotent_id('a30be228-afba-40c9-8678-ae020db68d79')57 def test_create_network_policys(self):58 """test method for create n/w policy objects"""59 with self.override_role():60 self._create_policy()61 @rbac_rule_validation.action(service="Contrail",62 rules=["show_network_policy"])63 @decorators.idempotent_id('6cefe92e-8936-49a6-bce0-12da3396e7ab')64 def test_show_network_policy(self):65 """test method for show n/w policy objects"""66 policy_uuid = self._create_policy()67 with self.override_role():68 self.network_policy_client.show_network_policy(policy_uuid)69 @rbac_rule_validation.action(service="Contrail",70 rules=["update_network_policy"])71 @decorators.idempotent_id('1d470505-3ad4-4870-87d7-3f0b0f9fc635')72 def test_update_network_policy(self):73 """test method for update n/w policy objects"""74 policy_uuid = self._create_policy()75 with self.override_role():76 self._update_policy(policy_uuid)77 @rbac_rule_validation.action(service="Contrail",78 rules=["delete_network_policy"])79 @decorators.idempotent_id('aae9018f-e7a2-4a75-a68e-afd6c380640e')80 def test_delete_network_policy(self):81 """test method for delete n/w policy objects"""82 policy_uuid = self._create_policy()83 with self.override_role():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful