Best Python code snippet using tempest_python
test_qos.py
Source:test_qos.py  
...27            raise cls.skipException(msg)28    @test.attr(type='smoke')29    @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')30    def test_create_policy(self):31        policy = self.create_qos_policy(name='test-policy',32                                        description='test policy desc1',33                                        shared=False)34        # Test 'show policy'35        retrieved_policy = self.admin_client.show_qos_policy(policy['id'])36        retrieved_policy = retrieved_policy['policy']37        self.assertEqual('test-policy', retrieved_policy['name'])38        self.assertEqual('test policy desc1', retrieved_policy['description'])39        self.assertFalse(retrieved_policy['shared'])40        # Test 'list policies'41        policies = self.admin_client.list_qos_policies()['policies']42        policies_ids = [p['id'] for p in policies]43        self.assertIn(policy['id'], policies_ids)44    @test.attr(type='smoke')45    @test.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')46    def test_list_policy_filter_by_name(self):47        self.create_qos_policy(name='test', description='test policy',48                               shared=False)49        self.create_qos_policy(name='test2', description='test policy',50                               shared=False)51        policies = (self.admin_client.52                    list_qos_policies(name='test')['policies'])53        self.assertEqual(1, len(policies))54        retrieved_policy = policies[0]55        self.assertEqual('test', retrieved_policy['name'])56    @test.attr(type='smoke')57    @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')58    def test_policy_update(self):59        policy = self.create_qos_policy(name='test-policy',60                                        description='',61                                        shared=False)62        self.admin_client.update_qos_policy(policy['id'],63                                            description='test policy desc2',64                                            shared=True)65        retrieved_policy = self.admin_client.show_qos_policy(policy['id'])66        retrieved_policy = retrieved_policy['policy']67        self.assertEqual('test policy desc2', retrieved_policy['description'])68        self.assertTrue(retrieved_policy['shared'])69        self.assertEqual([], retrieved_policy['rules'])70    @test.attr(type='smoke')71    @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')72    def test_delete_policy(self):73        policy = self.admin_client.create_qos_policy(74            'test-policy', 'desc', True)['policy']75        retrieved_policy = self.admin_client.show_qos_policy(policy['id'])76        retrieved_policy = retrieved_policy['policy']77        self.assertEqual('test-policy', retrieved_policy['name'])78        self.admin_client.delete_qos_policy(policy['id'])79        self.assertRaises(exceptions.NotFound,80                          self.admin_client.show_qos_policy, policy['id'])81    @test.attr(type='smoke')82    @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')83    def test_list_admin_rule_types(self):84        self._test_list_rule_types(self.admin_client)85    @test.attr(type='smoke')86    @test.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')87    def test_list_regular_rule_types(self):88        self._test_list_rule_types(self.client)89    def _test_list_rule_types(self, client):90        # List supported rule types91        # TODO(QoS): since in gate we run both ovs and linuxbridge ml2 drivers,92        # and since Linux Bridge ml2 driver does not have QoS support yet, ml293        # plugin reports no rule types are supported. Once linuxbridge will94        # receive support for QoS, the list of expected rule types will change.95        #96        # In theory, we could make the test conditional on which ml2 drivers97        # are enabled in gate (or more specifically, on which supported qos98        # rules are claimed by core plugin), but that option doesn't seem to be99        # available thru tempest_lib framework100        expected_rule_types = []101        expected_rule_details = ['type']102        rule_types = client.list_qos_rule_types()103        actual_list_rule_types = rule_types['rule_types']104        actual_rule_types = [rule['type'] for rule in actual_list_rule_types]105        # Verify that only required fields present in rule details106        for rule in actual_list_rule_types:107            self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))108        # Verify if expected rules are present in the actual rules list109        for rule in expected_rule_types:110            self.assertIn(rule, actual_rule_types)111    def _disassociate_network(self, client, network_id):112        client.update_network(network_id, qos_policy_id=None)113        updated_network = self.admin_client.show_network(network_id)114        self.assertIsNone(updated_network['network']['qos_policy_id'])115    @test.attr(type='smoke')116    @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')117    def test_policy_association_with_admin_network(self):118        policy = self.create_qos_policy(name='test-policy',119                                        description='test policy',120                                        shared=False)121        network = self.create_shared_network('test network',122                                             qos_policy_id=policy['id'])123        retrieved_network = self.admin_client.show_network(network['id'])124        self.assertEqual(125            policy['id'], retrieved_network['network']['qos_policy_id'])126        self._disassociate_network(self.admin_client, network['id'])127    @test.attr(type='smoke')128    @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')129    def test_policy_association_with_tenant_network(self):130        policy = self.create_qos_policy(name='test-policy',131                                        description='test policy',132                                        shared=True)133        network = self.create_network('test network',134                                      qos_policy_id=policy['id'])135        retrieved_network = self.admin_client.show_network(network['id'])136        self.assertEqual(137            policy['id'], retrieved_network['network']['qos_policy_id'])138        self._disassociate_network(self.client, network['id'])139    @test.attr(type='smoke')140    @test.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')141    def test_policy_association_with_network_nonexistent_policy(self):142        self.assertRaises(143            exceptions.NotFound,144            self.create_network,145            'test network',146            qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')147    @test.attr(type='smoke')148    @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')149    def test_policy_association_with_network_non_shared_policy(self):150        policy = self.create_qos_policy(name='test-policy',151                                        description='test policy',152                                        shared=False)153        self.assertRaises(154            exceptions.NotFound,155            self.create_network,156            'test network', qos_policy_id=policy['id'])157    @test.attr(type='smoke')158    @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')159    def test_policy_update_association_with_admin_network(self):160        policy = self.create_qos_policy(name='test-policy',161                                        description='test policy',162                                        shared=False)163        network = self.create_shared_network('test network')164        retrieved_network = self.admin_client.show_network(network['id'])165        self.assertIsNone(retrieved_network['network']['qos_policy_id'])166        self.admin_client.update_network(network['id'],167                                         qos_policy_id=policy['id'])168        retrieved_network = self.admin_client.show_network(network['id'])169        self.assertEqual(170            policy['id'], retrieved_network['network']['qos_policy_id'])171        self._disassociate_network(self.admin_client, network['id'])172    def _disassociate_port(self, port_id):173        self.client.update_port(port_id, qos_policy_id=None)174        updated_port = self.admin_client.show_port(port_id)175        self.assertIsNone(updated_port['port']['qos_policy_id'])176    @test.attr(type='smoke')177    @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')178    def test_policy_association_with_port_shared_policy(self):179        policy = self.create_qos_policy(name='test-policy',180                                        description='test policy',181                                        shared=True)182        network = self.create_shared_network('test network')183        port = self.create_port(network, qos_policy_id=policy['id'])184        retrieved_port = self.admin_client.show_port(port['id'])185        self.assertEqual(186            policy['id'], retrieved_port['port']['qos_policy_id'])187        self._disassociate_port(port['id'])188    @test.attr(type='smoke')189    @test.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')190    def test_policy_association_with_port_nonexistent_policy(self):191        network = self.create_shared_network('test network')192        self.assertRaises(193            exceptions.NotFound,194            self.create_port,195            network,196            qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')197    @test.attr(type='smoke')198    @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')199    def test_policy_association_with_port_non_shared_policy(self):200        policy = self.create_qos_policy(name='test-policy',201                                        description='test policy',202                                        shared=False)203        network = self.create_shared_network('test network')204        self.assertRaises(205            exceptions.NotFound,206            self.create_port,207            network, qos_policy_id=policy['id'])208    @test.attr(type='smoke')209    @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')210    def test_policy_update_association_with_port_shared_policy(self):211        policy = self.create_qos_policy(name='test-policy',212                                        description='test policy',213                                        shared=True)214        network = self.create_shared_network('test network')215        port = self.create_port(network)216        retrieved_port = self.admin_client.show_port(port['id'])217        self.assertIsNone(retrieved_port['port']['qos_policy_id'])218        self.client.update_port(port['id'], qos_policy_id=policy['id'])219        retrieved_port = self.admin_client.show_port(port['id'])220        self.assertEqual(221            policy['id'], retrieved_port['port']['qos_policy_id'])222        self._disassociate_port(port['id'])223    @test.attr(type='smoke')224    @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')225    def test_delete_not_allowed_if_policy_in_use_by_network(self):226        policy = self.create_qos_policy(name='test-policy',227                                        description='test policy',228                                        shared=True)229        network = self.create_shared_network(230            'test network', qos_policy_id=policy['id'])231        self.assertRaises(232            exceptions.Conflict,233            self.admin_client.delete_qos_policy, policy['id'])234        self._disassociate_network(self.admin_client, network['id'])235        self.admin_client.delete_qos_policy(policy['id'])236    @test.attr(type='smoke')237    @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')238    def test_delete_not_allowed_if_policy_in_use_by_port(self):239        policy = self.create_qos_policy(name='test-policy',240                                        description='test policy',241                                        shared=True)242        network = self.create_shared_network('test network')243        port = self.create_port(network, qos_policy_id=policy['id'])244        self.assertRaises(245            exceptions.Conflict,246            self.admin_client.delete_qos_policy, policy['id'])247        self._disassociate_port(port['id'])248        self.admin_client.delete_qos_policy(policy['id'])249    @test.attr(type='smoke')250    @test.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')251    def test_qos_policy_delete_with_rules(self):252        policy = self.create_qos_policy(name='test-policy',253                                        description='test policy',254                                        shared=False)255        self.admin_client.create_bandwidth_limit_rule(256            policy['id'], 200, 1337)['bandwidth_limit_rule']257        self.admin_client.delete_qos_policy(policy['id'])258        with testtools.ExpectedException(exceptions.NotFound):259            self.admin_client.show_qos_policy(policy['id'])260class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):261    @classmethod262    def resource_setup(cls):263        super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()264        if not test.is_extension_enabled('qos', 'network'):265            msg = "qos extension not enabled."266            raise cls.skipException(msg)267    @test.attr(type='smoke')268    @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')269    def test_rule_create(self):270        policy = self.create_qos_policy(name='test-policy',271                                        description='test policy',272                                        shared=False)273        rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],274                                                    max_kbps=200,275                                                    max_burst_kbps=1337)276        # Test 'show rule'277        retrieved_rule = self.admin_client.show_bandwidth_limit_rule(278            policy['id'], rule['id'])279        retrieved_rule = retrieved_rule['bandwidth_limit_rule']280        self.assertEqual(rule['id'], retrieved_rule['id'])281        self.assertEqual(200, retrieved_rule['max_kbps'])282        self.assertEqual(1337, retrieved_rule['max_burst_kbps'])283        # Test 'list rules'284        rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])285        rules = rules['bandwidth_limit_rules']286        rules_ids = [r['id'] for r in rules]287        self.assertIn(rule['id'], rules_ids)288        # Test 'show policy'289        retrieved_policy = self.admin_client.show_qos_policy(policy['id'])290        policy_rules = retrieved_policy['policy']['rules']291        self.assertEqual(1, len(policy_rules))292        self.assertEqual(rule['id'], policy_rules[0]['id'])293        self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,294                         policy_rules[0]['type'])295    @test.attr(type='smoke')296    @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')297    def test_rule_create_fail_for_the_same_type(self):298        policy = self.create_qos_policy(name='test-policy',299                                        description='test policy',300                                        shared=False)301        self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],302                                             max_kbps=200,303                                             max_burst_kbps=1337)304        self.assertRaises(exceptions.Conflict,305                          self.create_qos_bandwidth_limit_rule,306                          policy_id=policy['id'],307                          max_kbps=201, max_burst_kbps=1338)308    @test.attr(type='smoke')309    @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')310    def test_rule_update(self):311        policy = self.create_qos_policy(name='test-policy',312                                        description='test policy',313                                        shared=False)314        rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],315                                                    max_kbps=1,316                                                    max_burst_kbps=1)317        self.admin_client.update_bandwidth_limit_rule(policy['id'],318                                                      rule['id'],319                                                      max_kbps=200,320                                                      max_burst_kbps=1337)321        retrieved_policy = self.admin_client.show_bandwidth_limit_rule(322            policy['id'], rule['id'])323        retrieved_policy = retrieved_policy['bandwidth_limit_rule']324        self.assertEqual(200, retrieved_policy['max_kbps'])325        self.assertEqual(1337, retrieved_policy['max_burst_kbps'])326    @test.attr(type='smoke')327    @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')328    def test_rule_delete(self):329        policy = self.create_qos_policy(name='test-policy',330                                        description='test policy',331                                        shared=False)332        rule = self.admin_client.create_bandwidth_limit_rule(333            policy['id'], 200, 1337)['bandwidth_limit_rule']334        retrieved_policy = self.admin_client.show_bandwidth_limit_rule(335            policy['id'], rule['id'])336        retrieved_policy = retrieved_policy['bandwidth_limit_rule']337        self.assertEqual(rule['id'], retrieved_policy['id'])338        self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])339        self.assertRaises(exceptions.NotFound,340                          self.admin_client.show_bandwidth_limit_rule,341                          policy['id'], rule['id'])342    @test.attr(type='smoke')343    @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')344    def test_rule_create_rule_nonexistent_policy(self):345        self.assertRaises(346            exceptions.NotFound,347            self.create_qos_bandwidth_limit_rule,348            'policy', 200, 1337)349    @test.attr(type='smoke')350    @test.idempotent_id('eed8e2a6-22da-421b-89b9-935a2c1a1b50')351    def test_policy_create_forbidden_for_regular_tenants(self):352        self.assertRaises(353            exceptions.Forbidden,354            self.client.create_qos_policy,355            'test-policy', 'test policy', False)356    @test.attr(type='smoke')357    @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')358    def test_rule_create_forbidden_for_regular_tenants(self):359        self.assertRaises(360            exceptions.Forbidden,361            self.client.create_bandwidth_limit_rule,362            'policy', 1, 2)363    @test.attr(type='smoke')364    @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')365    def test_get_rules_by_policy(self):366        policy1 = self.create_qos_policy(name='test-policy1',367                                         description='test policy1',368                                         shared=False)369        rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],370                                                     max_kbps=200,371                                                     max_burst_kbps=1337)372        policy2 = self.create_qos_policy(name='test-policy2',373                                         description='test policy2',374                                         shared=False)375        rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],376                                                     max_kbps=5000,377                                                     max_burst_kbps=2523)378        # Test 'list rules'379        rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])380        rules = rules['bandwidth_limit_rules']381        rules_ids = [r['id'] for r in rules]382        self.assertIn(rule1['id'], rules_ids)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
