Best Python code snippet using autotest_python
qos_tc_impl.py
Source:qos_tc_impl.py  
1"""2Copyright 2020 The Magma Authors.3This source code is licensed under the BSD-style license found in the4LICENSE file in the root directory of this source tree.5Unless required by applicable law or agreed to in writing, software6distributed under the License is distributed on an "AS IS" BASIS,7WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.8See the License for the specific language governing permissions and9limitations under the License.10"""11from typing import List, Optional  # noqa12import os13import shlex14import subprocess15import logging16from lte.protos.policydb_pb2 import FlowMatch17from .types import QosInfo18from .utils import IdManager19LOG = logging.getLogger('pipelined.qos.qos_tc_impl')20# LOG.setLevel(logging.DEBUG)21# this code can run in either a docker container(CWAG) or as a native22# python process(AG). When we are running as a root there is no need for23# using sudo. (related to task T63499189 where tc commands failed since24# sudo wasn't available in the docker container)25def argSplit(cmd: str) -> List[str]:26    args = [] if os.geteuid() == 0 else ["sudo"]27    args.extend(shlex.split(cmd))28    return args29def run_cmd(cmd_list, show_error=True) -> int:30    err = 031    for cmd in cmd_list:32        LOG.debug("running %s", cmd)33        try:34            args = argSplit(cmd)35            subprocess.check_call(args)36        except subprocess.CalledProcessError as e:37            err = e.returncode38            if show_error:39                LOG.error("%s error running %s ", str(e), cmd)40    return err41# TODO - replace this implementation with pyroute2 tc42ROOT_QID = 6553443DEFAULT_RATE = '12Kbit'44DEFAULT_INTF_SPEED = '1000'45class TrafficClass:46    """47    Creates/Deletes queues in linux. Using Qdiscs for flow based48    rate limiting(traffic shaping) of user traffic.49    """50    @staticmethod51    def delete_class(intf: str, qid: int, show_error=True) -> int:52        qid_hex = hex(qid)53        # delete filter if this is a leaf class54        filter_cmd = "tc filter del dev {intf} protocol ip parent 1: prio 1 "55        filter_cmd += "handle {qid} fw flowid 1:{qid}"56        filter_cmd = filter_cmd.format(intf=intf, qid=qid_hex)57        tc_cmd = "tc class del dev {intf} classid 1:{qid}".format(intf=intf,58                                                                  qid=qid_hex)59        return run_cmd([filter_cmd, tc_cmd], show_error)60    @staticmethod61    def create_class(intf: str, qid: int, max_bw: int, rate=None,62                     parent_qid=None, show_error=True) -> int:63        if not rate:64            rate = DEFAULT_RATE65        if not parent_qid:66            parent_qid = ROOT_QID67        if parent_qid == qid:68            # parent qid should only be self for root case, everything else69            # should be the child of root class70            LOG.error('parent and self qid equal, setting parent_qid to root')71            parent_qid = ROOT_QID72        qid_hex = hex(qid)73        parent_qid_hex = hex(parent_qid)74        tc_cmd = "tc class add dev {intf} parent 1:{parent_qid} "75        tc_cmd += "classid 1:{qid} htb rate {rate} ceil {maxbw} prio 2"76        tc_cmd = tc_cmd.format(intf=intf, parent_qid=parent_qid_hex,77                               qid=qid_hex, rate=rate, maxbw=max_bw)78        # delete if exists79        TrafficClass.delete_class(intf, qid, show_error=False)80        run_cmd([tc_cmd], show_error=show_error)81        # add filter82        filter_cmd = "tc filter add dev {intf} protocol ip parent 1: prio 1 "83        filter_cmd += "handle {qid} fw flowid 1:{qid}"84        filter_cmd = filter_cmd.format(intf=intf, qid=qid_hex)85        # add qdisc and filter86        return run_cmd([filter_cmd], show_error)87    @staticmethod88    def init_qdisc(intf: str, show_error=False) -> int:89        speed = DEFAULT_INTF_SPEED90        qid_hex = hex(ROOT_QID)91        fn = "/sys/class/net/{intf}/speed".format(intf=intf)92        try:93            with open(fn) as f:94                speed = f.read().strip()95        except OSError:96            LOG.error('unable to read speed from %s defaulting to %s', fn, speed)97        qdisc_cmd = "tc qdisc add dev {intf} root handle 1: htb".format(intf=intf)98        parent_q_cmd = "tc class add dev {intf} parent 1: classid 1:{root_qid} htb "99        parent_q_cmd +="rate {speed}Mbit ceil {speed}Mbit"100        parent_q_cmd = parent_q_cmd.format(intf=intf, root_qid=qid_hex, speed=speed)101        tc_cmd = "tc class add dev {intf} parent 1:{root_qid} classid 1:1 htb "102        tc_cmd += "rate {rate} ceil {speed}Mbit"103        tc_cmd = tc_cmd.format(intf=intf, root_qid=qid_hex, rate=DEFAULT_RATE,104                               speed=speed)105        return run_cmd([qdisc_cmd, parent_q_cmd, tc_cmd], show_error)106    @staticmethod107    def read_all_classes(intf: str):108        qid_list = []109        # example output of this command110        # b'class htb 1:1 parent 1:fffe prio 0 rate 12Kbit ceil 1Gbit burst \111        # 1599b cburst 1375b \nclass htb 1:fffe root rate 1Gbit ceil 1Gbit \112        # burst 1375b cburst 1375b \n'113        # we need to parse this output and extract class ids from here114        tc_cmd = "tc class show dev {}".format(intf)115        args = argSplit(tc_cmd)116        try:117            output = subprocess.check_output(args)118            for ln in output.decode('utf-8').split("\n"):119                ln = ln.strip()120                if not ln:121                    continue122                tok = ln.split()123                if len(tok) < 5:124                    continue125                if tok[1] != "htb":126                    continue127                if tok[3] == 'root':128                    continue129                qid_str = tok[2].split(':')[1]130                qid = int(qid_str, 16)131                pqid_str = tok[4].split(':')[1]132                pqid = int(pqid_str, 16)133                qid_list.append((qid, pqid))134        except subprocess.CalledProcessError as e:135            LOG.error('failed extracting classids from tc %s', str(e))136        return qid_list137    @staticmethod138    def dump_class_state(intf: str, qid: int):139        qid_hex = hex(qid)140        tc_cmd = "tc -s -d class show dev {} classid 1:{}".format(intf,141                                                                  qid_hex)142        args = argSplit(tc_cmd)143        try:144            output = subprocess.check_output(args)145            print(output.decode())146        except subprocess.CalledProcessError:147            print("Exception dumping Qos State for %s", intf)148    @staticmethod149    def get_class_rate(intf: str, qid: int) -> Optional[str]:150        qid_hex = hex(qid)151        tc_cmd = "tc class show dev {} classid 1:{}".format(intf, qid_hex)152        args = argSplit(tc_cmd)153        try:154            # output: class htb 1:3 parent 1:2 prio 2 rate 250Kbit ceil 500Kbit burst 1600b cburst 1600b155            output = subprocess.check_output(args)156            # return all config from 'rate' onwards157            config = output.split("rate")158            return config[1]159        except subprocess.CalledProcessError:160            print("Exception dumping Qos State for %s", intf)161class TCManager(object):162    """163    Creates/Deletes queues in linux. Using Qdiscs for flow based164    rate limiting(traffic shaping) of user traffic.165    Queues are created on an egress interface and flows166    in OVS are programmed with qid to filter traffic to the queue.167    Traffic matching a specific flow is filtered to a queue and is168    rate limited based on configured value.169    Traffic to flows with no QoS configuration are sent to a170    default queue and are not rate limited.171    """172    def __init__(self,173                 datapath,174                 loop,175                 config) -> None:176        self._datapath = datapath177        self._loop = loop178        self._uplink = config['nat_iface']179        self._downlink = config['enodeb_iface']180        self._max_rate = config["qos"]["max_rate"]181        self._start_idx, self._max_idx = (config['qos']['linux_tc']['min_idx'],182                                          config['qos']['linux_tc']['max_idx'])183        self._id_manager = IdManager(self._start_idx, self._max_idx)184        self._initialized = True185        LOG.info("Init LinuxTC module uplink:%s downlink:%s",186                 config['nat_iface'], config['enodeb_iface'])187    def destroy(self, ):188        LOG.info("destroying existing qos classes")189        # ensure ordering during deletion of classes, children should be deleted190        # prior to the parent class ids191        for intf in [self._uplink, self._downlink]:192            qid_list = TrafficClass.read_all_classes(intf)193            for qid_tuple in qid_list:194                (qid, pqid) = qid_tuple195                if self._start_idx <= qid < (self._max_idx - 1):196                    LOG.info("Attemting to delete class idx %d", qid)197                    TrafficClass.delete_class(intf, qid, show_error=False)198                if self._start_idx <= pqid < (self._max_idx - 1):199                    LOG.info("Attemting to delete parent class idx %d", pqid)200                    TrafficClass.delete_class(intf, pqid, show_error=False)201    def setup(self, ):202        # initialize new qdisc203        TrafficClass.init_qdisc(self._uplink)204        TrafficClass.init_qdisc(self._downlink)205    def get_action_instruction(self, qid: int):206        # return an action and an instruction corresponding to this qid207        if qid < self._start_idx or qid > (self._max_idx - 1):208            LOG.error("invalid qid %d, no action/inst returned", qid)209            return210        parser = self._datapath.ofproto_parser211        return parser.OFPActionSetField(pkt_mark=qid), None212    def add_qos(self, d: FlowMatch.Direction, qos_info: QosInfo,213                parent=None) -> int:214        LOG.debug("add QoS: %s", qos_info)215        qid = self._id_manager.allocate_idx()216        intf = self._uplink if d == FlowMatch.UPLINK else self._downlink217        TrafficClass.create_class(intf, qid, qos_info.mbr,218                                  rate=qos_info.gbr,219                                  parent_qid=parent)220        LOG.debug("assigned qid: %d", qid)221        return qid222    def remove_qos(self, qid: int, d: FlowMatch.Direction,223                   recovery_mode=False):224        if not self._initialized and not recovery_mode:225            return226        if qid < self._start_idx or qid > (self._max_idx - 1):227            LOG.error("invalid qid %d, removal failed", qid)228            return229        LOG.debug("deleting qos_handle %s", qid)230        intf = self._uplink if d == FlowMatch.UPLINK else self._downlink231        err = TrafficClass.delete_class(intf, qid)232        if err == 0:233            self._id_manager.release_idx(qid)234        else:235            LOG.error('error deleting class %d, not releasing idx', qid)236        return237    def read_all_state(self, ):238        LOG.debug("read_all_state")239        st = {}240        ul_qid_list = TrafficClass.read_all_classes(self._uplink)241        dl_qid_list = TrafficClass.read_all_classes(self._downlink)242        for (d, qid_list) in ((FlowMatch.UPLINK, ul_qid_list),243                              (FlowMatch.DOWNLINK, dl_qid_list)):244            for qid_tuple in qid_list:245                qid, pqid = qid_tuple246                if qid < self._start_idx or qid > (self._max_idx - 1):247                    continue248                st[qid] = {249                    'direction': d,250                    'ambr_qid': pqid if pqid != self._max_idx else 0,251                }252        self._id_manager.restore_state(st)253        fut = self._loop.create_future()254        LOG.debug("map -> %s", st)255        fut.set_result(st)256        return fut257    def same_qos_config(self, d: FlowMatch.Direction,258                        qid1: int, qid2: int) -> bool:259        intf = self._uplink if d == FlowMatch.UPLINK else self._downlink260        config1 = TrafficClass.get_class_rate(intf, qid1)261        config2 = TrafficClass.get_class_rate(intf, qid2)...set_netem_only.py
Source:set_netem_only.py  
1#!/usr/bin/python2# -*- coding: utf-8 -*-3# vim:softtabstop=4:shiftwidth=4:expandtab4# python imports5from logging import info, debug, warn, error6from twisted.internet import defer, reactor7import os8import sys9import time10# umic-mesh imports11from um_measurement import measurement, tests12from um_functions import call13class DumbbellEvaluationMeasurement(measurement.Measurement):14    """This Measurement will run tests of several scenarios:15       - Each scenario is defined by it's flowgrind options.16       - One test of a scenario consists of parallel runs (flows)17         between all pairs defined in the pairs file.18       - One measurement-iteration will run one test of each scenario.19       - The number of iterations is determined by the "iterations" variable.20    """21    def __init__(self):22        """Constructor of the object"""23        self.logprefix=""24        measurement.Measurement.__init__(self)25        self.parser.set_defaults(pairfile = "vmesh_dumbbell_flowgrind_measurement_pairs.lst",26                                 offset=None, log_dir="out")27        self.parser.add_option('-f', '--pairfile', metavar="PAIRFILE",28                               action = 'store', type = 'string', dest = 'pairfile',29                               help = 'Set file to load node pairs from [default: %default]')30        self.parser.add_option('-o', '--offset', metavar="OFFSET",31                               action = 'store', type = 'string', dest = 'offset',32                               help = 'Offset for routers [default: %default]')33        self.later_args_list = []34    def set_option(self):35        """Set options"""36        measurement.Measurement.set_option(self)37        if (self.options.offset == None):38            error("Please give an offset!")39            sys.exit(1)40    @defer.inlineCallbacks41    def run_netem(self, reorder, ackreor, rdelay, delay, ackloss, limit, bottleneckbw, mode):42        fdnode="vmrouter%s" %(int(self.options.offset) + 9)    # forward path delay43        frnode="vmrouter%s" %(int(self.options.offset) + 11)    # forward path reordering44        qlnode="vmrouter%s" %(int(self.options.offset) + 12)    # queue limit45        rrnode="vmrouter%s" %(int(self.options.offset) + 13)    # reverse path reordering46        rdnode="vmrouter%s" %(int(self.options.offset) + 15)    # reverse path delay47        alnode="vmrouter%s" %(int(self.options.offset) + 14)     # ack loss48        info("Setting netem..")49        #forward path delay50        #if not delay == None:51        #    tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:5 handle 50: netem delay %ums %ums 20%%" %(mode, delay, (int)(delay * 0.1))52        #    rc = yield self.remote_execute(fdnode, tc_cmd)53        #forward path reordering54        if reorder == 0:55            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:2 handle 10: netem reorder 0%%" %(mode)56        else:57            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:2 handle 10: netem reorder %u%% \58                      reorderdelay %ums %ums 20%%" %(mode, reorder, rdelay, (int)(rdelay * 0.1))59        rc = yield self.remote_execute(frnode, tc_cmd, log_file=sys.stdout)60        #queue limit61        if not limit == None:62            tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:1 handle 10: netem limit %u; \63                      sudo tc qdisc %s dev eth0 parent 1:2 handle 20: netem limit %u" %(mode, limit, mode, limit)64            rc = yield self.remote_execute(qlnode, tc_cmd, log_file=sys.stdout)65        #bottleneck bandwidth66        if not bottleneckbw == None:67            tc_cmd = "sudo tc class %s dev eth0 parent 1: classid 1:1 htb rate %umbit; \68                      sudo tc class %s dev eth0 parent 1: classid 1:2 htb rate %umbit" %(mode, bottleneckbw, mode, bottleneckbw)69            rc = yield self.remote_execute(qlnode, tc_cmd, log_file=sys.stdout)70        #reverse path reordering71        if ackreor == 0:72            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem reorder 0%%" %(mode)73        else:74            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem reorder %u%% \75                      reorderdelay %ums %ums 20%%" %(mode, ackreor, rdelay, (int)(rdelay * 0.1))76        rc = yield self.remote_execute(rrnode, tc_cmd, log_file=sys.stdout)77        #reverse path delay78        #if not delay == None:79        #    tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:1 handle 50: netem delay %ums %ums 20%%" %(mode, delay, (int)(delay * 0.1))80        #    rc = yield self.remote_execute(rdnode, tc_cmd, log_file=sys.stdout)81        #ack loss82        if ackloss == 0:83            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem drop 0%%" %(mode)84        else:85            tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem drop %u%%" %(mode, ackloss)86        rc = yield self.remote_execute(alnode, tc_cmd, log_file=sys.stdout)87    @defer.inlineCallbacks88    def run(self):89        """Main method"""90        delay  = 2091        rate   = 2092        qlen   = int((2*delay*rate)/11.44)+193        rrate  = 094        rdelay = delay95        #initial settings for netem96        yield self.run_netem(0, 0, 0, 0, 0, 1000, 100, "add")97        yield self.run_netem(rrate,   0  ,  rdelay, delay,    0,    qlen,    rate, "change")98                         #reorder, ackreor, rdelay, delay, ackloss, limit, bottleneckbw99        reactor.stop()100    def main(self):101        self.parse_option()102        self.set_option()103        self.run()104        reactor.run()105if __name__ == "__main__":...tc_controller.py
Source:tc_controller.py  
1from flask import Blueprint, request, jsonify2from tc_rules import get, post, delete3from config import if_name4import logging, os5logger = logging.getLogger(__name__)6tc_controller_bp = Blueprint('ai_app', '__name__')7@tc_controller_bp.route('/tc_rules', methods=['GET', 'POST', 'DELETE'])8def handle_uav_data():9    if request.method == 'GET': 10        return jsonify(get())11    if request.method == 'POST': 12        create_rules(request.get_json())13        post(request.get_json())14        return jsonify(get())15    if request.method == 'DELETE': 16        delete_rules()17        delete()18        return jsonify({'result': 'Rules deleted.'})19def create_rules(rules):20    if not rules['apply']: return21    tc_cmd = f"tcset {if_name}"22    tc_cmd_ai = f"tcset {if_name}"23    if 'delay' in rules:24        tc_cmd += f" --delay {rules['delay']} "25    if 'rate' in rules:26        tc_cmd += f" --rate {rules['rate']}"27        tc_cmd_ai += f" --rate {rules['rate']}"28    if 'loss' in rules:29        tc_cmd += f" --loss {rules['loss']}"30    tc_cmd += " --overwrite"31    tc_cmd_ai += " --direction incoming --overwrite"32    # delete_rules()33    # exec_and_log(f"{tc_cmd} --direction incoming")34    # exec_and_log(f"{tc_cmd} --direction outgoing")35    exec_and_log(f"{tc_cmd}")36    exec_and_log(f"sshpass -p 'ffmpeg' ssh root@10.10.21.11 {tc_cmd_ai}")37def delete_rules():38    tc_cmd = f"tcdel {if_name} --all"39    exec_and_log(tc_cmd)40def exec_and_log(cmd):41    logger.info(f"Executing command: \t'{cmd}'")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
