Best Python code snippet using tempest_python
qos_manager.py
Source:qos_manager.py  
1# Copyright 2018 Red Hat, Inc.2# All Rights Reserved.3#4#    Licensed under the Apache License, Version 2.0 (the "License"); you may5#    not use this file except in compliance with the License. You may obtain6#    a copy of the License at7#8#         http://www.apache.org/licenses/LICENSE-2.09#10#    Unless required by applicable law or agreed to in writing, software11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13#    License for the specific language governing permissions and limitations14#    under the License.15import re16import threading17from enum import IntEnum18from math import ceil19from nfv_tempest_plugin.tests.common import shell_utilities as shell_utils20from oslo_log import log21from tempest import config22from tempest.lib.common.utils import data_utils23from tempest.lib.common.utils import test_utils24CONF = config.CONF25LOG = log.getLogger('{} [-] nfv_plugin_test'.format(__name__))26class QoSManagerMixin(object):27    class Servers(IntEnum):28        CLIENT_1 = 029        CLIENT_2 = 130        SERVER = 231    # Unit conversion from iperf report to os qos units [10^6]32    KBYTES_TO_MBITS = 10 ** 633    LOG_5102 = "/tmp/listen-5102.txt"34    LOG_5101 = "/tmp/listen-5101.txt"35    def create_network_qos_policy(self, namestart='qos-policy'):36        """Creates a network QoS policy"""37        qos_client = self.os_admin.qos_client38        result = qos_client.create_qos_policy(39            name=data_utils.rand_name(namestart))40        self.assertIsNotNone(result, 'Unable to create policy')41        qos_policy = result['policy']42        self.addCleanup(test_utils.call_and_ignore_notfound_exc,43                        qos_client.delete_qos_policy,44                        qos_policy['id'])45        return qos_policy46    def create_min_bw_qos_rule(self, policy_id=None, min_kbps=None,47                               direction='egress'):48        """Creates a minimum bandwidth QoS rule49        NOTE: Not all kernel versions support minimum bandwidth for all50        NIC drivers.51        Only egress (guest --> outside) traffic is currently supported.52        :param policy_id53        :param min_kbps: Minimum kbps bandwidth to apply to rule54        :param direction: Traffic direction that the rule applies to55        """56        SUPPORTED_DIRECTIONS = 'egress'57        if not policy_id:58            self.assertNotEmpty(self.qos_policy_groups,59                                'Unable to create_min_bw_qos_rule '60                                'self.qos_policy_groups is Empty')61            policy_id = self.qos_policy_groups[0]['id']62        if direction not in SUPPORTED_DIRECTIONS:63            raise ValueError('{d} is not a supported direction, supported '64                             'directions: {s_p}'65                             .format(d=direction,66                                     s_p=SUPPORTED_DIRECTIONS.join(', ')))67        qos_min_bw_client = self.os_admin_v2.qos_minimum_bandwidth_rules_client68        result = qos_min_bw_client.create_minimum_bandwidth_rule(69            policy_id, **{'min_kbps': min_kbps, 'direction': direction})70        self.assertIsNotNone(result, 'Unable to create minimum bandwidth '71                                     'QoS rule')72        qos_rule = result['minimum_bandwidth_rule']73        self.addCleanup(74            test_utils.call_and_ignore_notfound_exc,75            qos_min_bw_client.delete_minimum_bandwidth_rule, policy_id,76            qos_rule['id'])77    def create_max_bw_qos_rule(self, policy_id=None, max_kbps=None,78                               max_burst_kbps=None,79                               direction='egress'):80        """Creates a maximum bandwidth QoS rule81        Only egress (guest --> outside) traffic is currently supported.82        :param policy_id83        :param max_kbps: Maximum kbps bandwidth to apply to rule84        :param max_burst_kbps: max burst bandwidth to apply to rule85        :param direction: Traffic direction that the rule applies to86        """87        SUPPORTED_DIRECTIONS = 'egress'88        if not policy_id:89            self.assertNotEmpty(self.qos_policy_groups,90                                'Unable to create_max_bw_qos_rule '91                                'self.qos_policy_groups is Empty')92            policy_id = self.qos_policy_groups[0]['id']93        if direction not in SUPPORTED_DIRECTIONS:94            raise ValueError('{d} is not a supported direction, supported '95                             'directions: {s_p}'96                             .format(d=direction,97                                     s_p=SUPPORTED_DIRECTIONS.join(', ')))98        bw_rules = {'direction': direction,99                    'max_kbps': max_kbps,100                    'max_burst_kbps': max_burst_kbps}101        qos_max_bw_client = self.os_admin_v2.qos_limit_bandwidth_rules_client102        result = qos_max_bw_client.create_limit_bandwidth_rule(policy_id,103                                                               **bw_rules)104        self.assertIsNotNone(result, 'Unable to create maximum bandwidth '105                                     'QoS rule')106        qos_rule = result['bandwidth_limit_rule']107        self.addCleanup(108            test_utils.call_and_ignore_notfound_exc,109            qos_max_bw_client.delete_limit_bandwidth_rule, policy_id,110            qos_rule['id'])111    def create_qos_policy_with_rules(self, use_default=True, **kwargs):112        """Create_qos_policy with rules113        :param kwargs:114            kwargs['max_kbps']: parameter indicates maximum qos requested115            kwargs['min_kbps']: parameter indicates minimum qos requested116        :param use_default: parameter use self.qos_policy_groups for the test117        """118        qos_policy_groups = \119            self.create_network_qos_policy()120        if use_default:121            self.qos_policy_groups = qos_policy_groups122            self.assertNotEmpty(self.qos_policy_groups,123                                'Unable to qos policy '124                                'self.qos_policy_groups is Empty')125        if 'min_kbps' in kwargs:126            self.create_min_bw_qos_rule(127                policy_id=qos_policy_groups['id'],128                min_kbps=kwargs['min_kbps'])129            kwargs.pop('min_kbps')130        if 'max_kbps' in kwargs:131            self.create_max_bw_qos_rule(132                policy_id=qos_policy_groups['id'],133                **kwargs)134        return qos_policy_groups135    def check_qos_attached_to_guest(self, server, min_bw=False):136        """Check QoS attachment to guest137        This method checks if QoS is applied to an interface on hypervisor138        that is attached to guest139        :param server140        :param min_bw: Check for minimum bandwidth QoS141        """142        # Initialize parameters143        found_qos = False144        interface_data = shell_utils. \145            get_interfaces_from_overcloud_node(server['hypervisor_ip'])146        ports_client = self.os_admin.ports_client147        ports = ports_client.list_ports(device_id=server['id'])148        # Iterate over ports149        for port in ports['ports']:150            # If port has a QoS policy151            if port['qos_policy_id']:152                found_qos = True153                # Construct regular expression to locate port's MAC address154                re_string = r'.*{}.*'.format(port['mac_address'])155                line = re.search(re_string, interface_data)156                # Failed to locate MAC address on hypervisor157                if not line:158                    raise ValueError("Failed to locate interface with MAC "159                                     "'{}' on hypervisor"160                                     .format(port['mac_address']))161                line = line.group(0)162                # Check minimum bandwidth QoS163                if min_bw:164                    qos_min_bw_client = self.os_admin.qos_min_bw_client165                    min_qos_rule = \166                        qos_min_bw_client.list_minimum_bandwidth_rules(167                            port['qos_policy_id'])['minimum_bandwidth_rules']168                    # OpenStack API displays the size in Kbps169                    min_kbps = min_qos_rule[0]['min_kbps']170                    # Construct string to match Linux operating system171                    min_mbps = str(int(ceil(min_kbps / 1000)))172                    min_mbps = '{}Mbps'.format(min_mbps)173                    # Linux operating system displays the size in Mbps174                    qos = re.search(r'min_tx_rate \w+', line)175                    # Failed to locate min QoS176                    if not qos:177                        raise ValueError("Failed to dicover min QoS for "178                                         "interface with MAC '{}'"179                                         .format(port['mac_address']))180                    qos = qos.group(0)181                    # Filter QoS number182                    qos = qos.replace('min_tx_rate ', '')183                    self.assertEqual(min_mbps, qos)184        if not found_qos:185            raise ValueError('No QoS policies were applied to ports')186    def run_iperf_test(self, qos_policies=[], servers=[], key_pair=[],187                       vnic_type='direct'):188        """run_iperf_test189        This method receive server list, and prepare machines for iperf190        and run test commands in server and clients191        :param qos_policies qos policy created for servers192        :param servers servers list, at least 3193        :param key_pair servers key pairs194        """195        srv = QoSManagerMixin.Servers196        servers_ports_map = \197            [self.os_admin.ports_client.list_ports(198                device_id=server['id']) for server in self.servers]199        # Find machines ports based on type200        tested_ports = [shell_utils.find_vm_interface(201            ports, vnic_type=vnic_type) for ports in servers_ports_map]202        # Bind to iperf server ip_addr203        ip_addr = tested_ports[srv.SERVER][1]204        # Set pors with QoS205        LOG.info('Update client ports with QoS policies...')206        # Assume server is the last server index 2,207        # In case one policy parsed, CLIENT_1 is the tested port208        [self.update_port(209            tested_ports[i][0],210            **{'qos_policy_id': qos_policies[i]['id']})211            for i in range((len(self.servers) - 1)212                           and len(qos_policies))]213        LOG.info('Run iperf server on server3...')214        ssh_dest = self.get_remote_client(servers[srv.SERVER]['fip'],215                                          username=self.instance_user,216                                          private_key=key_pair[217                                              'private_key'])218        server_command = "sudo yum install iperf -y; "219        log_5102 = QoSManagerMixin.LOG_5102220        log_5101 = QoSManagerMixin.LOG_5101221        server_command += \222            "(nohup iperf -s -B {} -p 5102 -i 10 > {} 2>&1)& ".format(223                ip_addr, log_5102)224        server_command += \225            "(nohup iperf -s -B {} -p 5101 -i 10 > {} 2>&1)& ".format(226                ip_addr, log_5101)227        LOG.info('Receive iperf traffic from Server3...')228        ssh_dest.exec_command(server_command)229        install_iperf_command = "sudo yum install iperf -y"230        ssh_source1 = self. \231            get_remote_client(servers[srv.CLIENT_1]['fip'],232                              username=self.instance_user,233                              private_key=key_pair['private_key'])234        LOG.info('Installing iperf on Server1...')235        ssh_source1.exec_command(install_iperf_command)236        ssh_source2 = self. \237            get_remote_client(servers[srv.CLIENT_2]['fip'],238                              username=self.instance_user,239                              private_key=key_pair['private_key'])240        LOG.info('Installing iperf on Server2...')241        ssh_source2.exec_command(install_iperf_command)242        # must run in background, to check guaranteed min/bw for client1243        # Running vm CLIENT_2 initially, since it serves as one helper244        # nohup iperf -c 40.0.0.130 -T s2 -p 5101 -t  2>&1 &245        # (nohup iperf -c 40.0.0.164 -T s2 -p 5101 -t 60 >246        # /tmp/iperf.out 2>&1)&"247        threads = []248        threads.append(threading.Thread(target=ssh_source1.exec_command,249                                        args=("iperf -c {} -T s1 -p {} -t 120 "250                                              " > /tmp/iperf.out 2>&1"251                                              .format(ip_addr, '5101'),)))252        threads.append(threading.Thread(target=ssh_source2.exec_command,253                                        args=("iperf -c {} -T s1 -p {} -t 120 "254                                              " > /tmp/iperf.out 2>&1"255                                              .format(ip_addr, '5102'),)))256        for theread in threads:257            theread.start()258        LOG.info('Waiting for all iperf threads to complete')259        for theread in threads:260            theread.join()261    def collect_iperf_results(self, qos_rules_list=[],262                              servers=[], key_pair=[]):263        """collect_iperf_results264        This method receive server list, ssh to iperf server and collect265        iperf reports and compare it to QoS rules created for clients ports266        :param qos_rules_list qos rules list fpr tested ports267        :param servers servers list, at least 3268        :param key_pair servers key pairs269        """270        srv = QoSManagerMixin.Servers271        log_files = [QoSManagerMixin.LOG_5101, QoSManagerMixin.LOG_5102]272        # Run grep command over iperf server to verify BW is OK273        # File format is as following274        # [  4]  0.0-10.0 sec  4.76 GBytes  4.09 Gbits/sec275        # [  4] 10.0-20.0 sec  4.76 GBytes  4.09 Gbits/sec276        # cat /tmp/listen-5101.txt | while read line ; do277        # if [[ $line =~ [[:space:]]([0-9]{1,2}.[0-9]{1,2})[[:space:]]Gbits ]];278        # then echo ${BASH_REMATCH[1]}; fi ; done279        LOG.info('Collect iperf logs from iperf server, server3...')280        command = r"cat {} | while read line ;do  "281        command += r"if [[ \"$line\" =~ [[:space:]]"282        command += r"([0-9]{1,2}\.[0-9]{1,2})[[:space:]]Gbits ]]; "283        command += r"then echo \"${BASH_REMATCH[1]}\"; fi; done| sort| uniq"284        # Receive result with number285        ssh_dest = self.get_remote_client(servers[srv.SERVER]['fip'],286                                          username=self.instance_user,287                                          private_key=key_pair[288                                              'private_key'])289        # Assert result290        for index in range(srv.SERVER):291            # If Default QoS, such as min_bw check only srv.CLIENT_1292            if len(self.qos_policy_groups) > 0 and index == srv.CLIENT_2:293                break294            out_testing = ssh_dest. \295                exec_command(command.replace('{}', log_files[index]))296            iperf_rep = \297                [i for i in (out_testing.encode('utf8')).split() if i != '']298            self.assertNotEmpty(299                iperf_rep, "Please check QoS definitions, iperf result for "300                "in file {} is empty or low".format(log_files[index]))301            # Apply average for result302            res = [float(i.decode("utf-8").303                         replace('"', '')) for i in iperf_rep]304            rep = sum(res) / len(res)305            qos_type = 'max_kbps'306            # In case of min_bw only one policy is set307            if 'min_kbps' in qos_rules_list[srv(index)]:308                qos_type = 'min_kbps'309            self.calculate_deviation(test_type=qos_type, result_number=rep,310                                     rate_limit=qos_rules_list[311                                         srv(index)][qos_type])312    def calculate_deviation(self, test_type, rate_limit, result_number,313                            max_deviation_accepted=0.08):314        """Calculate deviation for a result number for a rate limit number315        Method supports two test types - max_kbps and min_kbps.316        For the max_kbps, the given number could be above the given rate limit317        within the accepted deviation number.318        For the min_kbps, the given number could be below the given rate limit319        within the accepted deviation number.320        :param test_type: One of two available test types - max_kbps/min_kbps321        :type test_type: str322        :param rate_limit: Limit rate for a given number. Depends on test type323        :type rate_limit: int324        :param result_number: A result number to calculate the deviation325        :type result_number: int326        :param max_deviation_accepted: Accepted deviation, defaults to 0.08327        :type max_deviation_accepted: float, optional328        """329        kbytes_to_mbits = QoSManagerMixin.KBYTES_TO_MBITS330        deviation = abs(result_number * kbytes_to_mbits / rate_limit - 1)331        err_msg = 'The number - {} deviates more than accepted deviation - {}'332        if test_type == "max_kbps" and (result_number * kbytes_to_mbits333                                        > rate_limit):334            self.assertLess(deviation, max_deviation_accepted,335                            err_msg.format(deviation, max_deviation_accepted))336        if test_type == "min_kbps" and (result_number * kbytes_to_mbits337                                        < rate_limit):338            self.assertLess(deviation, max_deviation_accepted,339                            err_msg.format(deviation, max_deviation_accepted))340        LOG.info('The test result number - {} with deviation - {}'341                 .format(result_number, deviation))...test_qos_limit_bandwidth_rules_client.py
Source:test_qos_limit_bandwidth_rules_client.py  
...86            qos_policy_id=self.FAKE_QOS_POLICY_ID,87            rule_id=self.FAKE_MAX_BW_RULE_ID88        )89    @decorators.idempotent_id('0a7c0964-e97b-11eb-aacb-74e5f9e2a801')90    def test_delete_limit_bandwidth_rule(self):91        self.check_service_client_function(92            self.qos_limit_bw_client.delete_limit_bandwidth_rule,93            "tempest.lib.common.rest_client.RestClient.delete",94            {},95            status=204,96            qos_policy_id=self.FAKE_QOS_POLICY_ID,97            rule_id=self.FAKE_MAX_BW_RULE_ID)98    @decorators.idempotent_id('08df88ae-e97d-11eb-aacb-74e5f9e2a801')99    def test_list_minimum_bandwidth_rules(self, bytes_body=False):100        self.check_service_client_function(101            self.qos_limit_bw_client.list_limit_bandwidth_rules,102            "tempest.lib.common.rest_client.RestClient.get",103            self.FAKE_MAX_BW_RULES,104            bytes_body,...qos_limit_bandwidth_rules_client.py
Source:qos_limit_bandwidth_rules_client.py  
...41        """42        uri = '/qos/policies/{}/bandwidth_limit_rules/{}'.format(43            qos_policy_id, rule_id)44        return self.show_resource(uri, **fields)45    def delete_limit_bandwidth_rule(self, qos_policy_id, rule_id):46        """Deletes a limit bandwidth rule for a QoS policy.47        For full list of available parameters, please refer to the official48        API reference:49        https://docs.openstack.org/api-ref/network/v2/index.html#delete-bandwidth-limit-rule50        """51        uri = '/qos/policies/{}/bandwidth_limit_rules/{}'.format(52            qos_policy_id, rule_id)53        return self.delete_resource(uri)54    def list_limit_bandwidth_rules(self, qos_policy_id, **filters):55        """Lists all limit bandwidth rules for a QoS policy.56        For full list of available parameters, please refer to the official57        API reference:58        https://docs.openstack.org/api-ref/network/v2/index.html#list-bandwidth-limit-rules-for-qos-policy59        """...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
