Best Python code snippet using localstack_python
test_policy.py
Source:test_policy.py  
...55                                                 DummyPolicy)56        environment.global_env().register_policy('senlin.policy.dummy-1.1',57                                                 DummyPolicy)58        self.spec = parser.simple_parse(sample_policy)59    def _create_policy(self, policy_name, policy_id=None):60        policy = pb.Policy(policy_name, self.spec,61                           user=self.ctx.user_id,62                           project=self.ctx.project_id,63                           domain=self.ctx.domain_id)64        if policy_id:65            policy.id = policy_id66        return policy67    def _create_db_policy(self, **kwargs):68        values = {69            'name': 'test-policy',70            'type': 'senlin.policy.dummy-1.0',71            'spec': self.spec,72            'created_at': timeutils.utcnow(True),73            'user': self.ctx.user_id,74            'project': self.ctx.project_id,75            'domain': self.ctx.domain_id,76        }77        values.update(kwargs)78        return po.Policy.create(self.ctx, values)79    def test_init(self):80        policy = self._create_policy('test-policy')81        self.assertIsNone(policy.id)82        self.assertEqual('test-policy', policy.name)83        self.assertEqual(self.spec, policy.spec)84        self.assertEqual('senlin.policy.dummy-1.0', policy.type)85        self.assertEqual(self.ctx.user_id, policy.user)86        self.assertEqual(self.ctx.project_id, policy.project)87        self.assertEqual(self.ctx.domain_id, policy.domain)88        self.assertEqual({}, policy.data)89        self.assertIsNone(policy.created_at)90        self.assertIsNone(policy.updated_at)91        self.assertTrue(policy.singleton)92        spec_data = policy.spec_data93        self.assertEqual('senlin.policy.dummy', spec_data['type'])94        self.assertEqual('1.0', spec_data['version'])95        self.assertEqual({'key1': 'value1', 'key2': 2},96                         spec_data['properties'])97        self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)98    def test_init_version_as_float(self):99        self.spec['version'] = 1.1100        policy = self._create_policy('test-policy')101        self.assertIsNone(policy.id)102        self.assertEqual('test-policy', policy.name)103        self.assertEqual(self.spec, policy.spec)104        self.assertEqual('senlin.policy.dummy-1.1', policy.type)105        self.assertEqual(self.ctx.user_id, policy.user)106        self.assertEqual(self.ctx.project_id, policy.project)107        self.assertEqual(self.ctx.domain_id, policy.domain)108        self.assertEqual({}, policy.data)109        self.assertIsNone(policy.created_at)110        self.assertIsNone(policy.updated_at)111        self.assertTrue(policy.singleton)112        spec_data = policy.spec_data113        self.assertEqual('senlin.policy.dummy', spec_data['type'])114        self.assertEqual('1.1', spec_data['version'])115        self.assertEqual({'key1': 'value1', 'key2': 2},116                         spec_data['properties'])117        self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)118    def test_init_version_as_string(self):119        self.spec['version'] = '1.1'120        policy = self._create_policy('test-policy')121        self.assertIsNone(policy.id)122        self.assertEqual('test-policy', policy.name)123        self.assertEqual(self.spec, policy.spec)124        self.assertEqual('senlin.policy.dummy-1.1', policy.type)125        self.assertEqual(self.ctx.user_id, policy.user)126        self.assertEqual(self.ctx.project_id, policy.project)127        self.assertEqual(self.ctx.domain_id, policy.domain)128        self.assertEqual({}, policy.data)129        self.assertIsNone(policy.created_at)130        self.assertIsNone(policy.updated_at)131        self.assertTrue(policy.singleton)132        spec_data = policy.spec_data133        self.assertEqual('senlin.policy.dummy', spec_data['type'])134        self.assertEqual('1.1', spec_data['version'])135        self.assertEqual({'key1': 'value1', 'key2': 2},136                         spec_data['properties'])137        self.assertEqual({'key1': 'value1', 'key2': 2}, policy.properties)138    def test_policy_new_type_not_found(self):139        bad_spec = {140            'type': 'bad-type',141            'version': '1.0',142            'properties': '',143        }144        self.assertRaises(exception.ResourceNotFound,145                          pb.Policy,146                          'test-policy', bad_spec)147    def test_load(self):148        policy = utils.create_policy(self.ctx, UUID1)149        result = pb.Policy.load(self.ctx, policy.id)150        self.assertEqual(policy.id, result.id)151        self.assertEqual(policy.name, result.name)152        self.assertEqual(policy.type, result.type)153        self.assertEqual(policy.user, result.user)154        self.assertEqual(policy.project, result.project)155        self.assertEqual(policy.domain, result.domain)156        self.assertEqual(policy.spec, result.spec)157        self.assertEqual(policy.data, result.data)158        self.assertEqual({'key1': 'value1', 'key2': 2}, result.properties)159        self.assertEqual(policy.created_at, result.created_at)160        self.assertEqual(policy.updated_at, result.updated_at)161    def test_load_with_policy(self):162        policy = utils.create_policy(self.ctx, UUID1)163        expected = pb.Policy.load(self.ctx, policy.id)164        res = pb.Policy.load(self.ctx, db_policy=policy)165        self.assertIsNotNone(res)166        self.assertEqual(expected.id, res.id)167    def test_load_diff_project(self):168        policy = utils.create_policy(self.ctx, UUID1)169        new_ctx = utils.dummy_context(project='a-different-project')170        self.assertRaises(exception.ResourceNotFound,171                          pb.Policy.load,172                          new_ctx, policy.id, None)173        res = pb.Policy.load(new_ctx, policy.id, project_safe=False)174        self.assertIsNotNone(res)175        self.assertEqual(policy.id, res.id)176    def test_load_not_found(self):177        ex = self.assertRaises(exception.ResourceNotFound,178                               pb.Policy.load,179                               self.ctx, 'fake-policy', None)180        self.assertEqual("The policy 'fake-policy' could not be found.",181                         str(ex))182        ex = self.assertRaises(exception.ResourceNotFound,183                               pb.Policy.load,184                               self.ctx, None, None)185        self.assertEqual("The policy 'None' could not be found.",186                         str(ex))187    def test_delete(self):188        policy = utils.create_policy(self.ctx, UUID1)189        policy_id = policy.id190        res = pb.Policy.delete(self.ctx, policy_id)191        self.assertIsNone(res)192        self.assertRaises(exception.ResourceNotFound,193                          pb.Policy.load,194                          self.ctx, policy_id, None)195    def test_delete_not_found(self):196        result = pb.Policy.delete(self.ctx, 'bogus')197        self.assertIsNone(result)198    def test_store_for_create(self):199        policy = self._create_policy('test-policy')200        self.assertIsNone(policy.id)201        policy_id = policy.store(self.ctx)202        self.assertIsNotNone(policy_id)203        self.assertEqual(policy_id, policy.id)204        result = po.Policy.get(self.ctx, policy_id)205        self.assertIsNotNone(result)206        self.assertEqual('test-policy', result.name)207        self.assertEqual(policy_id, result.id)208        self.assertEqual(policy.type, result.type)209        self.assertEqual(policy.user, result.user)210        self.assertEqual(policy.project, result.project)211        self.assertEqual(policy.domain, result.domain)212        self.assertEqual(policy.spec, result.spec)213        self.assertEqual(policy.data, result.data)214        self.assertIsNotNone(result.created_at)215        self.assertIsNone(result.updated_at)216    def test_store_for_update(self):217        policy = self._create_policy('test-policy')218        self.assertIsNone(policy.id)219        policy_id = policy.store(self.ctx)220        self.assertIsNotNone(policy_id)221        self.assertEqual(policy_id, policy.id)222        # do an update223        policy.name = 'test-policy-1'224        policy.data = {'kk': 'vv'}225        new_id = policy.store(self.ctx)226        self.assertEqual(policy_id, new_id)227        result = po.Policy.get(self.ctx, policy_id)228        self.assertIsNotNone(result)229        self.assertEqual('test-policy-1', result.name)230        self.assertEqual({'kk': 'vv'}, policy.data)231        self.assertIsNotNone(policy.created_at)232        self.assertIsNotNone(policy.updated_at)233    def test_to_dict(self):234        policy = self._create_policy('test-policy')235        policy_id = policy.store(self.ctx)236        self.assertIsNotNone(policy_id)237        expected = {238            'id': policy_id,239            'name': policy.name,240            'type': policy.type,241            'user': policy.user,242            'project': policy.project,243            'domain': policy.domain,244            'spec': policy.spec,245            'data': policy.data,246            'created_at': common_utils.isotime(policy.created_at),247            'updated_at': None,248        }249        result = pb.Policy.load(self.ctx, policy_id=policy.id)250        self.assertEqual(expected, result.to_dict())251    def test_get_schema(self):252        expected = {253            'key1': {254                'default': 'value1',255                'description': 'first key',256                'required': False,257                'updatable': False,258                'type': 'String'259            },260            'key2': {261                'description': 'second key',262                'required': True,263                'updatable': False,264                'type': 'Integer'265            },266        }267        res = DummyPolicy.get_schema()268        self.assertEqual(expected, res)269    def test_build_policy_data(self):270        policy = self._create_policy('test-policy')271        data = {'key1': 'value1'}272        res = policy._build_policy_data(data)273        expect_result = {274            'DummyPolicy': {275                'version': '1.0',276                'data': data277            }278        }279        self.assertEqual(expect_result, res)280    def test_extract_policy_data(self):281        policy = self._create_policy('test-policy')282        # Extract data correctly283        data = {'key1': 'value1'}284        policy_data = {285            'DummyPolicy': {286                'version': '1.0',287                'data': data288            }289        }290        res = policy._extract_policy_data(policy_data)291        self.assertEqual(data, res)292        # Policy class name unmatch293        data = {'key1': 'value1'}294        policy_data = {295            'FakePolicy': {296                'version': '1.0',297                'data': data298            }299        }300        res = policy._extract_policy_data(policy_data)301        self.assertIsNone(res)302        # Policy version don't match303        data = {'key1': 'value1'}304        policy_data = {305            'DummyPolicy': {306                'version': '2.0',307                'data': data308            }309        }310        res = policy._extract_policy_data(policy_data)311        self.assertIsNone(res)312    @mock.patch.object(pb.Policy, '_build_conn_params')313    @mock.patch('senlin.drivers.base.SenlinDriver')314    def test_keystone(self, mock_sd, mock_params):315        policy = self._create_policy('test-policy')316        fake_params = mock.Mock()317        mock_params.return_value = fake_params318        kc = mock.Mock()319        driver = mock.Mock()320        driver.identity.return_value = kc321        mock_sd.return_value = driver322        res = policy.keystone('user1', 'project1')323        self.assertEqual(kc, res)324        self.assertEqual(kc, policy._keystoneclient)325        mock_params.assert_called_once_with('user1', 'project1')326        mock_sd.assert_called_once_with()327        driver.identity.assert_called_once_with(fake_params)328    def test_keystone_already_initialized(self):329        policy = self._create_policy('test-policy')330        x_keystone = mock.Mock()331        policy._keystoneclient = x_keystone332        result = policy.keystone('foo', 'bar')333        self.assertEqual(x_keystone, result)334    @mock.patch.object(pb.Policy, '_build_conn_params')335    @mock.patch("senlin.drivers.base.SenlinDriver")336    def test_nova(self, mock_driver, mock_params):337        policy = self._create_policy('test-policy')338        fake_params = mock.Mock()339        mock_params.return_value = fake_params340        x_driver = mock.Mock()341        mock_driver.return_value = x_driver342        result = policy.nova('user1', 'project1')343        x_nova = x_driver.compute.return_value344        self.assertEqual(x_nova, result)345        self.assertEqual(x_nova, policy._novaclient)346        mock_params.assert_called_once_with('user1', 'project1')347        x_driver.compute.assert_called_once_with(fake_params)348    def test_nova_already_initialized(self):349        policy = self._create_policy('test-policy')350        x_nova = mock.Mock()351        policy._novaclient = x_nova352        result = policy.nova('foo', 'bar')353        self.assertEqual(x_nova, result)354    @mock.patch.object(pb.Policy, '_build_conn_params')355    @mock.patch("senlin.drivers.base.SenlinDriver")356    def test_network(self, mock_driver, mock_params):357        policy = self._create_policy('test-policy')358        fake_params = mock.Mock()359        mock_params.return_value = fake_params360        x_driver = mock.Mock()361        mock_driver.return_value = x_driver362        result = policy.network('user1', 'project1')363        x_network = x_driver.network.return_value364        self.assertEqual(x_network, result)365        self.assertEqual(x_network, policy._networkclient)366        mock_params.assert_called_once_with('user1', 'project1')367        x_driver.network.assert_called_once_with(fake_params)368    def test_network_already_initialized(self):369        policy = self._create_policy('test-policy')370        x_network = mock.Mock()371        policy._networkclient = x_network372        result = policy.network('foo', 'bar')373        self.assertEqual(x_network, result)374    @mock.patch.object(pb.Policy, '_build_conn_params')375    @mock.patch("senlin.drivers.base.SenlinDriver")376    def test_lbaas(self, mock_driver, mock_params):377        policy = self._create_policy('test-policy')378        fake_params = mock.Mock()379        mock_params.return_value = fake_params380        x_driver = mock.Mock()381        mock_driver.return_value = x_driver382        result = policy.lbaas('user1', 'project1')383        x_lbaas = x_driver.loadbalancing.return_value384        self.assertEqual(x_lbaas, result)385        self.assertEqual(x_lbaas, policy._lbaasclient)386        mock_params.assert_called_once_with('user1', 'project1')387        x_driver.loadbalancing.assert_called_once_with(fake_params)388    def test_lbaas_already_initialized(self):389        policy = self._create_policy('test-policy')390        x_lbaas = mock.Mock()391        policy._lbaasclient = x_lbaas392        result = policy.lbaas('foo', 'bar')393        self.assertEqual(x_lbaas, result)394    def test_default_need_check(self):395        action = mock.Mock()396        action.action = consts.CLUSTER_SCALE_IN397        action.data = {}398        policy = self._create_policy('test-policy')399        res = policy.need_check('BEFORE', action)400        self.assertTrue(res)401        setattr(policy, 'TARGET', [('BEFORE', consts.CLUSTER_SCALE_IN)])402        res = policy.need_check('BEFORE', action)403        self.assertTrue(res)404        res = policy.need_check('AFTER', action)405        self.assertFalse(res)406    def test_default_pre_op(self):407        policy = self._create_policy('test-policy')408        res = policy.pre_op('CLUSTER_ID', 'FOO')409        self.assertIsNone(res)410    def test_default_post_op(self):411        policy = self._create_policy('test-policy')412        res = policy.post_op('CLUSTER_ID', 'FOO')413        self.assertIsNone(res)414    def test_default_attach(self):415        cluster = mock.Mock()416        policy = self._create_policy('test-policy')417        # Policy targets on ANY profile types418        policy.PROFILE_TYPE = ['ANY']419        res = policy.attach(cluster)420        self.assertEqual((True, None), res)421        # Profile type of cluster is not in policy's target scope422        profile = mock.Mock()423        profile.type = 'os.nova.server'424        cluster.rt = {'profile': profile}425        policy.PROFILE_TYPE = ['os.heat.resource']426        msg = 'Policy not applicable on profile type: os.nova.server'427        res = policy.attach(cluster)428        self.assertEqual((False, msg), res)429        # Attaching succeed430        policy.PROFILE_TYPE = ['os.nova.server', 'os.heat.resource']431        res = policy.attach(cluster)432        self.assertEqual((True, None), res)433    def test_default_detach(self):434        cluster = mock.Mock()435        policy = self._create_policy('test-policy')436        res = policy.detach(cluster)437        self.assertEqual((True, None), res)438    @mock.patch.object(co.Credential, 'get')439    @mock.patch.object(senlin_ctx, 'get_service_credentials')440    @mock.patch.object(oslo_ctx, 'get_current')441    def test_build_conn_params(self, mock_get_current, mock_get_service_creds,442                               mock_cred_get):443        service_cred = {444            'auth_url': 'AUTH_URL',445            'username': 'senlin',446            'user_domain_name': 'default',447            'password': '123',448            'project_domain_name': 'Domain',449            'verify': True,450            'interface': 'Public',451        }452        current_ctx = {453            'auth_url': 'auth_url',454            'user_name': 'user1',455            'user_domain_name': 'default',456            'password': '456'457        }458        cred_info = {459            'openstack': {460                'trust': 'TRUST_ID',461            }462        }463        cred = mock.Mock(cred=cred_info)464        mock_get_service_creds.return_value = service_cred465        mock_get_current.return_value = current_ctx466        mock_cred_get.return_value = cred467        policy = self._create_policy('test-policy')468        res = policy._build_conn_params('user1', 'project1')469        expected_result = {470            'auth_url': 'AUTH_URL',471            'username': 'senlin',472            'user_domain_name': 'default',473            'password': '123',474            'trust_id': 'TRUST_ID',475            'project_domain_name': 'Domain',476            'verify': True,477            'interface': 'Public',478        }479        self.assertEqual(expected_result, res)480        mock_get_service_creds.assert_called_once_with()481        mock_cred_get.assert_called_once_with(current_ctx, 'user1', 'project1')482    @mock.patch.object(co.Credential, 'get')483    @mock.patch.object(senlin_ctx, 'get_service_credentials')484    @mock.patch.object(oslo_ctx, 'get_current')485    def test_build_conn_params_trust_not_found(486            self, mock_get_current, mock_get_service_creds, mock_cred_get):487        service_cred = {488            'auth_url': 'AUTH_URL',489            'username': 'senlin',490            'user_domain_name': 'default',491            'password': '123'492        }493        mock_get_service_creds.return_value = service_cred494        mock_cred_get.return_value = None495        policy = self._create_policy('test-policy')496        ex = self.assertRaises(exception.TrustNotFound,497                               policy._build_conn_params,498                               'user1', 'project1')499        msg = "The trust for trustor 'user1' could not be found."...test_PolicyService.py
Source:test_PolicyService.py  
...37        service._policy_dir = self.current_dir + "/out"38        # when39        service.add_read_policy(secret_path)40        # then41        self.assertEquals([self._create_policy(secret_path)], list(service._policies))42    def test_remove_read_policy(self):43        # given44        secret_path = "develop/myTeam/myService/someSecret"45        service = PolicyService(self.vault_config)46        service._policy_dir = self.current_dir + "/out"47        # when48        service.remove_read_policy(secret_path)49        # then50        self.assertEquals([], list(service._policies))51    def test_persist(self):52        # given53        target_dir = self.current_dir + "/out"54        service = PolicyService(self.vault_config)55        service._policy_dir = target_dir56        service._mesos_framework = "marathon"57        service._mesos_group = "myGroup"58        service._service = "myService"59        service._team = "myTeam"60        service._policies = set()61        service._policies.add(self._create_policy("ci/myTeam/myService/mongo.password"))62        service._policies.add(self._create_policy("develop/myTeam/myService/mongo.password"))63        service._policies.add(self._create_policy("live/myTeam/myService/mongo.password"))64        service._policies.add(self._create_policy("live/myTeam/myService/jdbc.password"))65        # when66        service.persist()67        # then68        files = [f for f in os.listdir(target_dir)]69        assert_that(files, contains_inanyorder("marathon.myGroup.live_myService.myTeam.live.hcl",70                                               "marathon.myGroup.aliasForNonLive_myService.myTeam.ci.hcl",71                                               "marathon.myGroup.aliasForNonLive_myService.myTeam.develop.hcl"))72    @data(("ci/myTeam/myService/mongo.password", "ci/myTeam/myService/mongo.password"),73          ("/ci/myTeam/myService/mongo.password", "ci/myTeam/myService/mongo.password"))74    @unpack75    def test_normalize_path(self, path, expected_normalized_path):76        # given77        service = PolicyService(self.vault_config)78        # when79        normalized_path = service._normalize_path(path)80        # then81        self.assertEquals(expected_normalized_path, normalized_path)82    def expected_policies(self):83        return [self._create_policy("live/myTeam/myService/mongo.password"),84                self._create_policy("live/myTeam/myService/jdbc.password"),85                self._create_policy("ci/myTeam/myService/mongo.password"),86                self._create_policy("ci/myTeam/myService/jdbc.password"),87                self._create_policy("develop/myTeam/myService/mongo.password"),88                self._create_policy("develop/myTeam/myService/jdbc.password")89                ]90    @staticmethod91    def _create_policy(secret_path):...test_network_policy.py
Source:test_network_policy.py  
...24CONF = config.CONF25LOG = logging.getLogger(__name__)26class NetworkPolicyTest(rbac_base.BaseContrailTest):27    """Test class to test network policy objects using RBAC roles"""28    def _create_policy(self):29        fq_name = data_utils.rand_name('network-policy')30        post_body = {31            'parent_type': 'project',32            'fq_name': ["default-domain", self.tenant_name, fq_name]33        }34        resp_body = self.network_policy_client.create_network_policys(35            **post_body)36        network_policy_uuid = resp_body['network-policy']['uuid']37        self.addCleanup(self._try_delete_resource,38                        self.network_policy_client.delete_network_policy,39                        network_policy_uuid)40        return network_policy_uuid41    def _update_policy(self, network_policy_uuid):42        put_body = {43            'display_name': data_utils.rand_name('network-policy')44        }45        self.network_policy_client.update_network_policy(network_policy_uuid,46                                                         **put_body)47    @rbac_rule_validation.action(service="Contrail",48                                 rules=["list_network_policys"])49    @decorators.idempotent_id('fa2a28f3-a8bb-4908-95b9-1e11cf58b16f')50    def test_list_policys(self):51        """test method for list n/w policy objects"""52        with self.override_role():53            self.network_policy_client.list_network_policys()54    @rbac_rule_validation.action(service="Contrail",55                                 rules=["create_network_policys"])56    @decorators.idempotent_id('a30be228-afba-40c9-8678-ae020db68d79')57    def test_create_network_policys(self):58        """test method for create n/w policy objects"""59        with self.override_role():60            self._create_policy()61    @rbac_rule_validation.action(service="Contrail",62                                 rules=["show_network_policy"])63    @decorators.idempotent_id('6cefe92e-8936-49a6-bce0-12da3396e7ab')64    def test_show_network_policy(self):65        """test method for show n/w policy objects"""66        policy_uuid = self._create_policy()67        with self.override_role():68            self.network_policy_client.show_network_policy(policy_uuid)69    @rbac_rule_validation.action(service="Contrail",70                                 rules=["update_network_policy"])71    @decorators.idempotent_id('1d470505-3ad4-4870-87d7-3f0b0f9fc635')72    def test_update_network_policy(self):73        """test method for update n/w policy objects"""74        policy_uuid = self._create_policy()75        with self.override_role():76            self._update_policy(policy_uuid)77    @rbac_rule_validation.action(service="Contrail",78                                 rules=["delete_network_policy"])79    @decorators.idempotent_id('aae9018f-e7a2-4a75-a68e-afd6c380640e')80    def test_delete_network_policy(self):81        """test method for delete n/w policy objects"""82        policy_uuid = self._create_policy()83        with self.override_role():...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
