Best Python code snippet using lisa_python
drv_ntulpe_filters.py
Source:drv_ntulpe_filters.py  
...106        for loc in self.added_l3l4_loc:107            Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, loc)).run()108        self.added_l3l4_loc = []109        log.debug('added_l3l4_loc: {}'.format(self.added_l3l4_loc))110    def send_packets(self, ipv4=False, ipv6=False, udp=False, tcp=False, sctp=False,111                     src_ip=None, dst_ip=None, src_port=None, dst_port=None,112                     count=100, backgroud_traffic=True):113        src_ip_range = self.get_ip_range(src_ip)114        dst_ip_range = self.get_ip_range(dst_ip)115        src_port_range = self.get_port_range(src_port)116        dst_port_range = self.get_port_range(dst_port)117        if backgroud_traffic:118            self.ping(to_host=self.DUT_IPV4_ADDR, from_host=self.LKP_IPV4_ADDR, number=20)119        for src_ip, dst_ip, src_port, dst_port in itertools.product(120                src_ip_range, dst_ip_range, src_port_range, dst_port_range):121            pkt = Ether(dst=self.dut_mac)122            if ipv4:123                pkt = pkt / IP(src=src_ip, dst=dst_ip)124            elif ipv6:125                pkt = pkt / IPv6(src=src_ip, dst=dst_ip)126            if udp:127                pkt = pkt / UDP(sport=src_port, dport=dst_port)128            elif tcp:129                pkt = pkt / TCP(sport=src_port, dport=dst_port)130            elif sctp:131                pkt = pkt / SCTP(sport=src_port, dport=dst_port)132            log.info('Prepared packet: {}'.format(pkt.summary()))133            lkp_aqsendp = Aqsendp(134                packet=pkt, count=count,135                host=self.lkp_hostname, iface=self.lkp_iface136            )137            lkp_aqsendp.run()138    def test_rxflow_vlan(self):139        """140        @description: Test rxflow vlan 1 filter141        @steps:142        1. Configure filter.143        2. Send traffic.144        3. Check traffic recived in target queue.145        @result: Check passed.146        @duration: 5 sec.147        """148        try:149            def create_vlan_iface(vlan):150                self.dut_ifconfig.create_vlan_iface(vlan)151                self.dut_ifconfig.set_link_state(LINK_STATE_UP, vlan_id=vlan)152                self.dut_ifconfig.wait_link_up(vlan_id=vlan)153            action_queue = 1154            num_packets = 100155            vlan_id_1 = 1156            vlan_id_2 = 2157            src_ip = "192.168.200.201"158            dst_ip = "192.168.200.200"159            create_vlan_iface(vlan_id_1)160            create_vlan_iface(vlan_id_2)161            set_filter_cmd = "ethtool -N {} flow-type ether vlan {} m 0xF000 action {} loc {}".format(162                self.dut_iface, vlan_id_1, action_queue, self.VLAN_LOCATION + 1)163            assert Command(cmd=set_filter_cmd).run()["returncode"] == 0164            Command(cmd='sudo ethtool -n {}'.format(self.dut_iface)).run()165            stat_before = self.dut_statistics.get_drv_counters()166            for vlan in range(1, 12):167                pkt = Ether(dst=self.dut_mac) / Dot1Q(vlan=vlan) / IP(dst=dst_ip, src=src_ip)168                lkp_aqsendp = Aqsendp(169                    packet=pkt, count=num_packets,170                    host=self.lkp_hostname, iface=self.lkp_iface)171                lkp_aqsendp.run()172            stat_after = self.dut_statistics.get_drv_counters()173            k = self.queue_name.format(action_queue)174            assert stat_after[k] - stat_before[k] == num_packets175        finally:176            self.dut_ifconfig.delete_vlan_iface(vlan_id_1)177            self.dut_ifconfig.delete_vlan_iface(vlan_id_2)178            Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.VLAN_LOCATION)).run()179    def test_rxflow_ethertype(self):180        """181        @description: Test rxflow ethertype filter182        @steps:183        1. Configure filter.184        2. Send traffic.185        3. Check traffic recived in target queue.186        @result: Check passed.187        @duration: 5 sec.188        """189        try:190            action_queue = 1191            num_packets = 100192            ethertype = 0x0801193            src_udp_port = 6565194            dst_udp_port = 99195            set_filter_cmd = "ethtool -N {} flow-type ether proto {} action {} loc {}".format(196                self.dut_iface, ethertype, action_queue, self.ETHERTYPE_LOCATION)197            assert Command(cmd=set_filter_cmd).run()["returncode"] == 0198            Command(cmd='sudo ethtool -n {}'.format(self.dut_iface)).run()199            pkt = Ether(dst=self.dut_mac, type=ethertype) / IP() / UDP(sport=src_udp_port, dport=dst_udp_port)200            lkp_aqsendp = Aqsendp(201                packet=pkt, count=num_packets,202                host=self.lkp_hostname, iface=self.lkp_iface203            )204            stat_before = self.dut_statistics.get_drv_counters()205            lkp_aqsendp.run()206            stat_after = self.dut_statistics.get_drv_counters()207            k = self.queue_name.format(action_queue)208            assert stat_after[k] - stat_before[k] == num_packets209        finally:210            Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.ETHERTYPE_LOCATION)).run()211    def test_rxflow_ethertype_user_prior(self):212        """213        @description: Test rxflow ethertype and user priority filter214        @steps:215        1. Configure filter.216        2. Send traffic.217        3. Check traffic recived in target queue.218        @result: Check passed.219        @duration: 5 sec.220        """221        if "forwarding" in self.dut_drv_version:222            pytest.skip()223        try:224            action_queue = 1225            num_packets = 100226            src_udp_port = 6565227            dst_udp_port = 99228            ethertype = 0x0801229            vlan = 1230            prio = 4231            vlan_type = (prio << 13) | vlan232            set_filter_cmd = "ethtool -N {} flow-type ether proto {} vlan {} m 0x1FFF action {} loc {}".format(233                self.dut_iface, ethertype, vlan_type, action_queue, self.ETHERTYPE_LOCATION)234            assert Command(cmd=set_filter_cmd).run()["returncode"] == 0235            sniffer = Tcpdump(host=self.dut_hostname, port=self.dut_port, timeout=60)236            sniffer.run_async()237            time.sleep(30)238            stat_before = self.dut_statistics.get_drv_counters()239            # send traffic for all priorities240            for p in range(0, 8):241                l2 = Ether(dst=self.dut_mac) / Dot1Q(vlan=vlan, prio=p, type=ethertype)242                pkt = l2 / IP() / UDP(sport=src_udp_port, dport=dst_udp_port)243                lkp_aqsendp = Aqsendp(244                    packet=pkt, count=num_packets,245                    host=self.lkp_hostname, iface=self.lkp_iface246                )247                lkp_aqsendp.run()248            stat_after = self.dut_statistics.get_drv_counters()249            dut_packets = sniffer.join(30)250            wrpcap("packets.pcap", dut_packets)251            shutil.copy("packets.pcap", self.test_log_dir)252            k = self.queue_name.format(action_queue)253            assert stat_after[k] - stat_before[k] == num_packets254        finally:255            Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.ETHERTYPE_LOCATION)).run()256    def test_rxflow_ipv4_src_ip(self):257        """258        @description: Test rxflow l3l4 IPv6 srcip259        @steps:260        1. Configure filter.261        2. Send traffic.262        3. Check traffic recived in target queue.263        @result: Check passed.264        @duration: 5 sec.265        """266        try:267            action_queue = 1268            num_packets = 100269            src_ip = "169.254.1.1"270            self.add_filter(flow_type='ip4', src_ip=src_ip)271            stat_before = self.dut_statistics.get_drv_counters()272            self.send_packets(ipv4=True, src_ip=(src_ip, 4), count=num_packets)273            stat_after = self.dut_statistics.get_drv_counters()274            k = self.queue_name.format(action_queue)275            assert stat_after[k] - stat_before[k] == num_packets276        finally:277            self.clear_filters()278    def test_rxflow_ipv4_dst_ip(self):279        """280        @description: Test rxflow l3l4 IPv6 dstip281        @steps:282        1. Configure filter.283        2. Send traffic.284        3. Check traffic recived in target queue.285        @result: Check passed.286        @duration: 5 sec.287        """288        try:289            action_queue = 1290            num_packets = 100291            dst_ip = "169.254.1.1"292            self.add_filter(flow_type='ip4', dst_ip=dst_ip)293            stat_before = self.dut_statistics.get_drv_counters()294            self.send_packets(ipv4=True, dst_ip=(dst_ip, 4), count=num_packets)295            stat_after = self.dut_statistics.get_drv_counters()296            k = self.queue_name.format(action_queue)297            assert stat_after[k] - stat_before[k] == num_packets298        finally:299            self.clear_filters()300    def test_rxflow_ipv4_src_ip_dst_ip(self):301        """302        @description: Test rxflow l3l4 IPv6 dstip srcip303        @steps:304        1. Configure filter.305        2. Send traffic.306        3. Check traffic recived in target queue.307        @result: Check passed.308        @duration: 5 sec.309        """310        try:311            action_queue = 1312            num_packets = 100313            src_ip = "169.254.1.1"314            dst_ip = "169.254.2.1"315            self.add_filter(flow_type='ip4', src_ip=src_ip, dst_ip=dst_ip)316            stat_before = self.dut_statistics.get_drv_counters()317            self.send_packets(ipv4=True, src_ip=(src_ip, 5), dst_ip=(dst_ip, 4), count=num_packets)318            stat_after = self.dut_statistics.get_drv_counters()319            k = self.queue_name.format(action_queue)320            assert stat_after[k] - stat_before[k] == num_packets321        finally:322            self.clear_filters()323    def test_rxflow_ipv6_src_ip(self):324        """325        @description: Test rxflow l3l4 IPv6 srcip326        @steps:327        1. Configure filter.328        2. Send traffic.329        3. Check traffic recived in target queue.330        @result: Check passed.331        @duration: 5 sec.332        """333        try:334            action_queue = 1335            num_packets = 100336            src_ip = "2001:db8:dead::1"337            self.add_filter(flow_type='ip6', src_ip=src_ip)338            stat_before = self.dut_statistics.get_drv_counters()339            self.send_packets(ipv6=True, src_ip=(src_ip, 5), count=num_packets)340            stat_after = self.dut_statistics.get_drv_counters()341            k = self.queue_name.format(action_queue)342            assert stat_after[k] - stat_before[k] == num_packets343        finally:344            self.clear_filters()345    def test_rxflow_ipv6_dst_ip(self):346        """347        @description: Test rxflow l3l4 IPv6 dstip348        @steps:349        1. Configure filter.350        2. Send traffic.351        3. Check traffic recived in target queue.352        @result: Check passed.353        @duration: 5 sec.354        """355        try:356            action_queue = 1357            num_packets = 100358            dst_ip = "2001:db8:dead::1"359            self.add_filter(flow_type='ip6', dst_ip=dst_ip)360            stat_before = self.dut_statistics.get_drv_counters()361            self.send_packets(ipv6=True, dst_ip=(dst_ip, 5), count=num_packets)362            stat_after = self.dut_statistics.get_drv_counters()363            k = self.queue_name.format(action_queue)364            assert stat_after[k] - stat_before[k] == num_packets365        finally:366            self.clear_filters()367    def test_rxflow_ipv6_src_ip_dst_ip(self):368        """369        @description: Test rxflow l3l4 IPv6 dstip srcip370        @steps:371        1. Configure filter.372        2. Send traffic.373        3. Check traffic recived in target queue.374        @result: Check passed.375        @duration: 5 sec.376        """377        try:378            action_queue = 1379            num_packets = 100380            src_ip = "2001:db8:0001::1"381            dst_ip = "2001:db8:0002::1"382            src_ip_2 = "2001:db8:0001::2"383            dst_ip_2 = "2001:db8:0002::2"384            if self.dut_fw_card == CARD_ANTIGUA:385                filters_count = 3386                self.add_filter(flow_type='ip6', src_ip=src_ip, dst_ip=(dst_ip, 3))387            else:388                filters_count = 2389                self.add_filter(flow_type='ip6', src_ip=src_ip, dst_ip=dst_ip, loc=32)390                self.add_filter(flow_type='ip6', src_ip=src_ip_2, dst_ip=dst_ip_2, loc=36)391            stat_before = self.dut_statistics.get_drv_counters()392            self.send_packets(ipv6=True, src_ip=(src_ip, 5), dst_ip=(dst_ip, 5), count=num_packets)393            stat_after = self.dut_statistics.get_drv_counters()394            k = self.queue_name.format(action_queue)395            assert stat_after[k] - stat_before[k] == num_packets * filters_count396        finally:397            self.clear_filters()398    def test_rxflow_combine_ipv4_ipv6(self):399        """400        @description: Test rxflow l3l4 IPv6 dstip srcip401        @steps:402        1. Configure filter.403        2. Send traffic.404        3. Check traffic recived in target queue.405        @result: Check passed.406        @duration: 5 sec.407        """408        if self.dut_fw_card != CARD_ANTIGUA:409            pytest.skip()410        try:411            action_queue = 1412            num_packets = 100413            src_ip4 = "169.254.1.1"414            dst_ip4 = "169.254.2.1"415            src_ip6 = "2001:db8:0001::1"416            dst_ip6 = "2001:db8:0002::1"417            self.add_filter(flow_type='ip4', src_ip=src_ip4, dst_ip=dst_ip4)418            self.add_filter(flow_type='ip6', src_ip=src_ip6, dst_ip=dst_ip6)419            stat_before = self.dut_statistics.get_drv_counters()420            self.send_packets(ipv4=True, src_ip=(src_ip4, 2), dst_ip=(dst_ip4, 2), count=num_packets)421            self.send_packets(ipv6=True, src_ip=(src_ip6, 2), dst_ip=(dst_ip6, 2), count=num_packets)422            stat_after = self.dut_statistics.get_drv_counters()423            k = self.queue_name.format(action_queue)424            assert stat_after[k] - stat_before[k] == num_packets * 2425        finally:426            self.clear_filters()427    def test_rxflow_udp4_src_port(self):428        """429        @description: Test rxflow l3l4 udp4 src port filter430        @steps:431        1. Configure filter.432        2. Send traffic.433        3. Check traffic recived in target queue.434        @result: Check passed.435        @duration: 5 sec.436        """437        try:438            action_queue = 1439            num_packets = 100440            src_ip = "192.168.100.1"441            dst_ip = '192.168.200.1'442            src_port = 6565443            dst_port = 1024444            self.add_filter(flow_type='udp4', src_port=src_port)445            stat_before = self.dut_statistics.get_drv_counters()446            # send udp447            self.send_packets(ipv4=True, udp=True, count=num_packets,448                              src_ip=src_ip, dst_ip=dst_ip,449                              src_port=(src_port, 4), dst_port=dst_port)450            stat_after = self.dut_statistics.get_drv_counters()451            # send tcp (background traffic)452            self.send_packets(ipv4=True, tcp=True, count=num_packets,453                              src_ip=src_ip, dst_ip=dst_ip,454                              src_port=src_port, dst_port=dst_port)455            k = self.queue_name.format(action_queue)456            assert stat_after[k] - stat_before[k] == num_packets457        finally:458            self.clear_filters()459    def test_rxflow_udp4_dst_port(self):460        """461        @description: Test rxflow l3l4 udp4 dst port filter462        @steps:463        1. Configure filter.464        2. Send traffic.465        3. Check traffic recived in target queue.466        @result: Check passed.467        @duration: 5 sec.468        """469        try:470            action_queue = 1471            num_packets = 100472            src_ip = "192.168.100.1"473            dst_ip = '192.168.200.1'474            src_port = 6565475            dst_port = 1024476            self.add_filter(flow_type='udp4', dst_port=dst_port)477            stat_before = self.dut_statistics.get_drv_counters()478            # send udp479            self.send_packets(ipv4=True, udp=True, count=num_packets,480                              src_ip=src_ip, dst_ip=dst_ip,481                              src_port=src_port, dst_port=(dst_port, 4))482            stat_after = self.dut_statistics.get_drv_counters()483            # send tcp (background traffic)484            self.send_packets(ipv4=True, tcp=True, count=num_packets,485                              src_ip=src_ip, dst_ip=dst_ip,486                              src_port=src_port, dst_port=dst_port)487            k = self.queue_name.format(action_queue)488            assert stat_after[k] - stat_before[k] == num_packets489        finally:490            self.clear_filters()491    def test_rxflow_udp4_src_ip(self):492        """493        @description: Test rxflow l3l4 udp4 src ip filter494        @steps:495        1. Configure filter.496        2. Send traffic.497        3. Check traffic recived in target queue.498        @result: Check passed.499        @duration: 5 sec.500        """501        try:502            action_queue = 1503            num_packets = 100504            src_ip = "192.168.100.1"505            dst_ip = '192.168.200.1'506            src_port = 6565507            dst_port = 1024508            self.add_filter(flow_type='udp4', src_ip=src_ip)509            stat_before = self.dut_statistics.get_drv_counters()510            # send udp511            self.send_packets(ipv4=True, udp=True, count=num_packets,512                              src_ip=(src_ip, 4), dst_ip=dst_ip,513                              src_port=src_port, dst_port=dst_port)514            # send tcp (background traffic)515            self.send_packets(ipv4=True, tcp=True, count=num_packets,516                              src_ip=src_ip, dst_ip=dst_ip,517                              src_port=src_port, dst_port=dst_port)518            stat_after = self.dut_statistics.get_drv_counters()519            k = self.queue_name.format(action_queue)520            assert stat_after[k] - stat_before[k] == num_packets521        finally:522            self.clear_filters()523    def test_rxflow_udp4_dst_ip(self):524        """525        @description: Test rxflow l3l4 udp4 dst ip filter526        @steps:527        1. Configure filter.528        2. Send traffic.529        3. Check traffic recived in target queue.530        @result: Check passed.531        @duration: 5 sec.532        """533        try:534            action_queue = 1535            num_packets = 100536            src_ip = "192.168.100.1"537            dst_ip = '192.168.200.1'538            src_port = 6565539            dst_port = 1024540            self.add_filter(flow_type='udp4', dst_ip=dst_ip)541            stat_before = self.dut_statistics.get_drv_counters()542            # send udp543            self.send_packets(ipv4=True, udp=True, count=num_packets,544                              src_ip=src_ip, dst_ip=(dst_ip, 4),545                              src_port=src_port, dst_port=dst_port)546            # send tcp (background traffic)547            self.send_packets(ipv4=True, tcp=True, count=num_packets,548                              src_ip=src_ip, dst_ip=dst_ip,549                              src_port=src_port, dst_port=dst_port)550            stat_after = self.dut_statistics.get_drv_counters()551            k = self.queue_name.format(action_queue)552            assert stat_after[k] - stat_before[k] == num_packets553        finally:554            self.clear_filters()555    def test_rxflow_udp4_all_multiple_ips(self):556        """557        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters558        @steps:559        1. Configure filter.560        2. Send traffic.561        3. Check traffic recived in target queue.562        @result: Check passed.563        @duration: 5 sec.564        """565        try:566            action_queue = 1567            num_packets = 100568            src_ip = "192.168.100.1"569            dst_ip = "192.168.200.1"570            src_port = 10244571            dst_port = 10243572            if self.dut_fw_card == CARD_ANTIGUA:573                filters_count = 3574                self.add_filter(flow_type='udp4',575                                src_ip=(src_ip, 1), dst_ip=(dst_ip, 3),576                                src_port=src_port, dst_port=dst_port)577            else:578                filters_count = 4579                self.add_filter(flow_type='udp4',580                                src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),581                                src_port=src_port, dst_port=dst_port)582            stat_before = self.dut_statistics.get_drv_counters()583            # send udp584            self.send_packets(ipv4=True, udp=True, count=num_packets,585                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),586                              src_port=(src_port, 2), dst_port=(dst_port, 2))587            stat_after = self.dut_statistics.get_drv_counters()588            # send tcp (background traffic)589            self.send_packets(ipv4=True, tcp=True, count=num_packets,590                              src_ip=src_ip, dst_ip=dst_ip,591                              src_port=src_port, dst_port=dst_port)592            k = self.queue_name.format(action_queue)593            assert stat_after[k] - stat_before[k] == num_packets * filters_count594        finally:595            self.clear_filters()596    def test_rxflow_udp4_all_multiple_ports(self):597        """598        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters599        @steps:600        1. Configure filter.601        2. Send traffic.602        3. Check traffic recived in target queue.603        @result: Check passed.604        @duration: 5 sec.605        """606        try:607            action_queue = 1608            num_packets = 100609            src_ip = "192.168.100.1"610            dst_ip = "192.168.200.1"611            src_port = 10244612            dst_port = 10243613            if self.dut_fw_card == CARD_ANTIGUA:614                filters_count = 3615                self.add_filter(flow_type='udp4',616                                src_ip=src_ip, dst_ip=dst_ip,617                                src_port=src_port, dst_port=(dst_port, 3))618            else:619                filters_count = 4620                self.add_filter(flow_type='udp4',621                                src_ip=src_ip, dst_ip=dst_ip,622                                src_port=(src_port, 2), dst_port=(dst_port, 2))623            stat_before = self.dut_statistics.get_drv_counters()624            # send udp625            self.send_packets(ipv4=True, udp=True, count=num_packets,626                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),627                              src_port=(src_port, 4), dst_port=(dst_port, 4))628            # send tcp (background traffic)629            self.send_packets(ipv4=True, tcp=True, count=num_packets,630                              src_ip=src_ip, dst_ip=dst_ip,631                              src_port=src_port, dst_port=dst_port)632            stat_after = self.dut_statistics.get_drv_counters()633            k = self.queue_name.format(action_queue)634            assert stat_after[k] - stat_before[k] == num_packets * filters_count635        finally:636            self.clear_filters()637    def test_rxflow_udp4_srcport_drop(self):638        """639        @description: Test rxflow l3l4 srcport DROP filter640        @steps:641        1. Configure filter.642        2. Send traffic.643        3. Check traffic dropped.644        @result: Check passed.645        @duration: 5 sec.646        """647        try:648            action_queue = -1649            num_packets = 100650            src_ip = "192.168.100.1"651            dst_ip = "192.168.200.1"652            src_port = 10244653            dst_port = 10243654            self.add_filter(flow_type='udp4', src_port=src_port, action=action_queue)655            stat_before = self.dut_statistics.get_drv_counters()656            self.send_packets(ipv4=True, udp=True, count=num_packets,657                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),658                              src_port=src_port, dst_port=(dst_port, 2),659                              backgroud_traffic=False)660            stat_after = self.dut_statistics.get_drv_counters()661            for q in range(self.rings):662                k = self.queue_name.format(q)663                assert stat_after[k] - stat_before[k] < 20664        finally:665            self.clear_filters()666    def test_rxflow_udp6_src_port(self):667        """668        @description: Test rxflow l3l4 udp6 src port filter669        @steps:670        1. Configure filter.671        2. Send traffic.672        3. Check traffic recived in target queue.673        @result: Check passed.674        @duration: 5 sec.675        """676        try:677            action_queue = 1678            num_packets = 100679            src_ip = "2001:db8:0001::1"680            dst_ip = "2001:db8:0002::1"681            src_port = 6565682            dst_port = 1024683            self.add_filter(flow_type='udp6', src_port=src_port)684            stat_before = self.dut_statistics.get_drv_counters()685            # send udp6686            self.send_packets(ipv6=True, udp=True, count=num_packets,687                              src_ip=src_ip, dst_ip=dst_ip,688                              src_port=(src_port, 4), dst_port=dst_port)689            # send tcp6 (background traffic)690            self.send_packets(ipv6=True, tcp=True, count=num_packets,691                              src_ip=src_ip, dst_ip=dst_ip,692                              src_port=src_port, dst_port=dst_port)693            stat_after = self.dut_statistics.get_drv_counters()694            k = self.queue_name.format(action_queue)695            assert stat_after[k] - stat_before[k] == num_packets696        finally:697            self.clear_filters()698    def test_rxflow_udp6_dst_port(self):699        """700        @description: Test rxflow l3l4 udp6 dst port filter701        @steps:702        1. Configure filter.703        2. Send traffic.704        3. Check traffic recived in target queue.705        @result: Check passed.706        @duration: 5 sec.707        """708        try:709            action_queue = 1710            num_packets = 100711            src_ip = "2001:db8:0001::1"712            dst_ip = "2001:db8:0002::1"713            src_port = 6565714            dst_port = 1024715            self.add_filter(flow_type='udp6', dst_port=dst_port)716            stat_before = self.dut_statistics.get_drv_counters()717            # send udp6718            self.send_packets(ipv6=True, udp=True, count=num_packets,719                              src_ip=src_ip, dst_ip=dst_ip,720                              src_port=src_port, dst_port=(dst_port, 4))721            # send tcp6 (background traffic)722            self.send_packets(ipv6=True, tcp=True, count=num_packets,723                              src_ip=src_ip, dst_ip=dst_ip,724                              src_port=src_port, dst_port=dst_port)725            stat_after = self.dut_statistics.get_drv_counters()726            k = self.queue_name.format(action_queue)727            assert stat_after[k] - stat_before[k] == num_packets728        finally:729            self.clear_filters()730    def test_rxflow_udp6_src_ip(self):731        """732        @description: Test rxflow l3l4 udp6 src ip filter733        @steps:734        1. Configure filter.735        2. Send traffic.736        3. Check traffic recived in target queue.737        @result: Check passed.738        @duration: 5 sec.739        """740        try:741            action_queue = 1742            num_packets = 100743            src_ip = "2001:db8:0001::1"744            dst_ip = "2001:db8:0002::1"745            src_port = 6565746            dst_port = 1024747            self.add_filter(flow_type='udp6', src_ip=src_ip)748            stat_before = self.dut_statistics.get_drv_counters()749            # send udp6750            self.send_packets(ipv6=True, udp=True, count=num_packets,751                              src_ip=(src_ip, 4), dst_ip=dst_ip,752                              src_port=src_port, dst_port=dst_port)753            # send tcp6 (background traffic)754            self.send_packets(ipv6=True, tcp=True, count=num_packets,755                              src_ip=src_ip, dst_ip=dst_ip,756                              src_port=src_port, dst_port=dst_port)757            stat_after = self.dut_statistics.get_drv_counters()758            k = self.queue_name.format(action_queue)759            assert stat_after[k] - stat_before[k] == num_packets760        finally:761            self.clear_filters()762    def test_rxflow_udp6_dst_ip(self):763        """764        @description: Test rxflow l3l4 udp6 dst ip filter765        @steps:766        1. Configure filter.767        2. Send traffic.768        3. Check traffic recived in target queue.769        @result: Check passed.770        @duration: 5 sec.771        """772        try:773            action_queue = 1774            num_packets = 100775            src_ip = "2001:db8:0001::1"776            dst_ip = "2001:db8:0002::1"777            src_port = 6565778            dst_port = 1024779            self.add_filter(flow_type='udp6', dst_ip=dst_ip)780            stat_before = self.dut_statistics.get_drv_counters()781            # send udp6782            self.send_packets(ipv6=True, udp=True, count=num_packets,783                              src_ip=src_ip, dst_ip=(dst_ip, 4),784                              src_port=src_port, dst_port=dst_port)785            # send tcp6 (background traffic)786            self.send_packets(ipv6=True, tcp=True, count=num_packets,787                              src_ip=src_ip, dst_ip=dst_ip,788                              src_port=src_port, dst_port=dst_port)789            stat_after = self.dut_statistics.get_drv_counters()790            k = self.queue_name.format(action_queue)791            assert stat_after[k] - stat_before[k] == num_packets792        finally:793            self.clear_filters()794    def test_rxflow_udp6_all_multiple_ips(self):795        """796        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters797        @steps:798        1. Configure filter.799        2. Send traffic.800        3. Check traffic recived in target queue.801        @result: Check passed.802        @duration: 5 sec.803        """804        try:805            action_queue = 1806            num_packets = 100807            src_ip = "2001:db8:0001::1"808            dst_ip = "2001:db8:0002::1"809            src_ip_2 = "2001:db8:0001::2"810            dst_ip_2 = "2001:db8:0002::2"811            src_port = 10244812            dst_port = 10243813            if self.dut_fw_card == CARD_ANTIGUA:814                filters_count = 3815                self.add_filter(flow_type='udp6',816                                src_ip=src_ip, dst_ip=(dst_ip, 3),817                                src_port=src_port, dst_port=dst_port)818            else:819                filters_count = 2820                self.add_filter(flow_type='udp6',821                                src_ip=src_ip, dst_ip=dst_ip,822                                src_port=src_port, dst_port=dst_port, loc=32)823                self.add_filter(flow_type='udp6',824                                src_ip=src_ip_2, dst_ip=dst_ip_2,825                                src_port=src_port, dst_port=dst_port, loc=36)826            stat_before = self.dut_statistics.get_drv_counters()827            # send udp6828            self.send_packets(ipv6=True, udp=True, count=num_packets,829                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),830                              src_port=(src_port, 2), dst_port=(dst_port, 2))831            # send tcp6 (background traffic)832            self.send_packets(ipv6=True, tcp=True, count=num_packets,833                              src_ip=src_ip, dst_ip=dst_ip,834                              src_port=src_port, dst_port=dst_port)835            stat_after = self.dut_statistics.get_drv_counters()836            k = self.queue_name.format(action_queue)837            assert stat_after[k] - stat_before[k] == num_packets * filters_count838        finally:839            self.clear_filters()840    def test_rxflow_udp6_all_multiple_ports(self):841        """842        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters843        @steps:844        1. Configure filter.845        2. Send traffic.846        3. Check traffic recived in target queue.847        @result: Check passed.848        @duration: 5 sec.849        """850        try:851            action_queue = 1852            num_packets = 100853            src_ip = "2001:db8:0001::1"854            dst_ip = "2001:db8:0002::1"855            src_port = 10244856            dst_port = 10243857            src_port_2 = 10246858            dst_port_2 = 10245859            if self.dut_fw_card == CARD_ANTIGUA:860                filters_count = 3861                self.add_filter(flow_type='udp6',862                                src_ip=src_ip, dst_ip=dst_ip,863                                src_port=src_port, dst_port=(dst_port, 3))864            else:865                filters_count = 2866                self.add_filter(flow_type='udp6',867                                src_ip=src_ip, dst_ip=dst_ip,868                                src_port=src_port, dst_port=dst_port, loc=32)869                self.add_filter(flow_type='udp6',870                                src_ip=src_ip, dst_ip=dst_ip,871                                src_port=src_port_2, dst_port=dst_port_2, loc=36)872            stat_before = self.dut_statistics.get_drv_counters()873            # send udp6874            self.send_packets(ipv6=True, udp=True, count=num_packets,875                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),876                              src_port=(src_port, 4), dst_port=(dst_port, 4))877            # send tcp6 (background traffic)878            self.send_packets(ipv6=True, tcp=True, count=num_packets,879                              src_ip=src_ip, dst_ip=dst_ip,880                              src_port=src_port, dst_port=dst_port)881            stat_after = self.dut_statistics.get_drv_counters()882            k = self.queue_name.format(action_queue)883            assert stat_after[k] - stat_before[k] == num_packets * filters_count884        finally:885            self.clear_filters()886    def test_rxflow_udp6_srcport_drop(self):887        """888        @description: Test rxflow l3l4 srcport DROP filter889        @steps:890        1. Configure filter.891        2. Send traffic.892        3. Check traffic dropped.893        @result: Check passed.894        @duration: 5 sec.895        """896        try:897            action_queue = -1898            num_packets = 100899            src_ip = "2001:db8:0001::1"900            dst_ip = "2001:db8:0002::1"901            src_port = 10244902            dst_port = 10243903            self.add_filter(flow_type='udp6', src_port=src_port, action=action_queue)904            stat_before = self.dut_statistics.get_drv_counters()905            self.send_packets(ipv6=True, udp=True, count=num_packets,906                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),907                              src_port=src_port, dst_port=(dst_port, 2),908                              backgroud_traffic=False)909            stat_after = self.dut_statistics.get_drv_counters()910            for q in range(self.rings):911                k = self.queue_name.format(q)912                assert stat_after[k] - stat_before[k] < 20913        finally:914            self.clear_filters()915    def test_rxflow_tcp4_src_port(self):916        """917        @description: Test rxflow l3l4 tcp4 src port filter918        @steps:919        1. Configure filter.920        2. Send traffic.921        3. Check traffic recived in target queue.922        @result: Check passed.923        @duration: 5 sec.924        """925        try:926            action_queue = 1927            num_packets = 100928            src_ip = "192.168.100.1"929            dst_ip = '192.168.200.1'930            src_port = 6565931            dst_port = 1024932            self.add_filter(flow_type='tcp4', src_port=src_port)933            stat_before = self.dut_statistics.get_drv_counters()934            # send tcp935            self.send_packets(ipv4=True, tcp=True, count=num_packets,936                              src_ip=src_ip, dst_ip=dst_ip,937                              src_port=(src_port, 4), dst_port=dst_port)938            # send udp (backgroud traffic)939            self.send_packets(ipv4=True, udp=True, count=num_packets,940                              src_ip=src_ip, dst_ip=dst_ip,941                              src_port=src_port, dst_port=dst_port)942            stat_after = self.dut_statistics.get_drv_counters()943            k = self.queue_name.format(action_queue)944            assert stat_after[k] - stat_before[k] == num_packets945        finally:946            self.clear_filters()947    def test_rxflow_tcp4_dst_port(self):948        """949        @description: Test rxflow l3l4 tcp4 dst port filter950        @steps:951        1. Configure filter.952        2. Send traffic.953        3. Check traffic recived in target queue.954        @result: Check passed.955        @duration: 5 sec.956        """957        try:958            action_queue = 1959            num_packets = 100960            src_ip = "192.168.100.1"961            dst_ip = '192.168.200.1'962            src_port = 6565963            dst_port = 1024964            self.add_filter(flow_type='tcp4', dst_port=dst_port)965            stat_before = self.dut_statistics.get_drv_counters()966            # send tcp967            self.send_packets(ipv4=True, tcp=True, count=num_packets,968                              src_ip=src_ip, dst_ip=dst_ip,969                              src_port=src_port, dst_port=(dst_port, 4))970            # send udp (backgroud traffic)971            self.send_packets(ipv4=True, udp=True, count=num_packets,972                              src_ip=src_ip, dst_ip=dst_ip,973                              src_port=src_port, dst_port=dst_port)974            stat_after = self.dut_statistics.get_drv_counters()975            k = self.queue_name.format(action_queue)976            assert stat_after[k] - stat_before[k] == num_packets977        finally:978            self.clear_filters()979    def test_rxflow_tcp4_src_ip(self):980        """981        @description: Test rxflow l3l4 tcp4 src ip filter982        @steps:983        1. Configure filter.984        2. Send traffic.985        3. Check traffic recived in target queue.986        @result: Check passed.987        @duration: 5 sec.988        """989        try:990            action_queue = 1991            num_packets = 100992            src_ip = "192.168.100.1"993            dst_ip = '192.168.200.1'994            src_port = 6565995            dst_port = 1024996            self.add_filter(flow_type='tcp4', src_ip=src_ip)997            stat_before = self.dut_statistics.get_drv_counters()998            # send tcp999            self.send_packets(ipv4=True, tcp=True, count=num_packets,1000                              src_ip=(src_ip, 4), dst_ip=dst_ip,1001                              src_port=src_port, dst_port=dst_port)1002            # send udp (backgroud traffic)1003            self.send_packets(ipv4=True, udp=True, count=num_packets,1004                              src_ip=src_ip, dst_ip=dst_ip,1005                              src_port=src_port, dst_port=dst_port)1006            stat_after = self.dut_statistics.get_drv_counters()1007            k = self.queue_name.format(action_queue)1008            assert stat_after[k] - stat_before[k] == num_packets1009        finally:1010            self.clear_filters()1011    def test_rxflow_tcp4_dst_ip(self):1012        """1013        @description: Test rxflow l3l4 tcp4 dst ip filter1014        @steps:1015        1. Configure filter.1016        2. Send traffic.1017        3. Check traffic recived in target queue.1018        @result: Check passed.1019        @duration: 5 sec.1020        """1021        try:1022            action_queue = 11023            num_packets = 1001024            src_ip = "192.168.100.1"1025            dst_ip = '192.168.200.1'1026            src_port = 65651027            dst_port = 10241028            self.add_filter(flow_type='tcp4', dst_ip=dst_ip)1029            stat_before = self.dut_statistics.get_drv_counters()1030            # send tcp1031            self.send_packets(ipv4=True, tcp=True, count=num_packets,1032                              src_ip=src_ip, dst_ip=(dst_ip, 4),1033                              src_port=src_port, dst_port=dst_port)1034            # send udp (backgroud traffic)1035            self.send_packets(ipv4=True, udp=True, count=num_packets,1036                              src_ip=src_ip, dst_ip=dst_ip,1037                              src_port=src_port, dst_port=dst_port)1038            stat_after = self.dut_statistics.get_drv_counters()1039            k = self.queue_name.format(action_queue)1040            assert stat_after[k] - stat_before[k] == num_packets1041        finally:1042            self.clear_filters()1043    def test_rxflow_tcp4_all_multiple_ips(self):1044        """1045        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1046        @steps:1047        1. Configure filter.1048        2. Send traffic.1049        3. Check traffic recived in target queue.1050        @result: Check passed.1051        @duration: 5 sec.1052        """1053        try:1054            action_queue = 11055            num_packets = 1001056            src_ip = "192.168.100.1"1057            dst_ip = "192.168.200.1"1058            src_port = 102441059            dst_port = 102431060            if self.dut_fw_card == CARD_ANTIGUA:1061                filters_count = 31062                self.add_filter(flow_type='tcp4',1063                                src_ip=src_ip, dst_ip=(dst_ip, 3),1064                                src_port=src_port, dst_port=dst_port)1065            else:1066                filters_count = 41067                self.add_filter(flow_type='tcp4',1068                                src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1069                                src_port=src_port, dst_port=dst_port)1070            stat_before = self.dut_statistics.get_drv_counters()1071            # send tcp1072            self.send_packets(ipv4=True, tcp=True, count=num_packets,1073                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1074                              src_port=(src_port, 2), dst_port=(dst_port, 2))1075            # send udp (backgroud traffic)1076            self.send_packets(ipv4=True, udp=True, count=num_packets,1077                              src_ip=src_ip, dst_ip=dst_ip,1078                              src_port=src_port, dst_port=dst_port)1079            stat_after = self.dut_statistics.get_drv_counters()1080            k = self.queue_name.format(action_queue)1081            assert stat_after[k] - stat_before[k] == num_packets * filters_count1082        finally:1083            self.clear_filters()1084    def test_rxflow_tcp4_all_multiple_ports(self):1085        """1086        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1087        @steps:1088        1. Configure filter.1089        2. Send traffic.1090        3. Check traffic recived in target queue.1091        @result: Check passed.1092        @duration: 5 sec.1093        """1094        try:1095            action_queue = 11096            num_packets = 1001097            src_ip = "192.168.100.1"1098            dst_ip = "192.168.200.1"1099            src_port = 102441100            dst_port = 102431101            if self.dut_fw_card == CARD_ANTIGUA:1102                filters_count = 31103                self.add_filter(flow_type='tcp4',1104                                src_ip=src_ip, dst_ip=dst_ip,1105                                src_port=src_port, dst_port=(dst_port, 3))1106            else:1107                filters_count = 41108                self.add_filter(flow_type='tcp4',1109                                src_ip=src_ip, dst_ip=dst_ip,1110                                src_port=(src_port, 2), dst_port=(dst_port, 2))1111            stat_before = self.dut_statistics.get_drv_counters()1112            # send tcp1113            self.send_packets(ipv4=True, tcp=True, count=num_packets,1114                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1115                              src_port=(src_port, 4), dst_port=(dst_port, 4))1116            # send udp (backgroud traffic)1117            self.send_packets(ipv4=True, udp=True, count=num_packets,1118                              src_ip=src_ip, dst_ip=dst_ip,1119                              src_port=src_port, dst_port=dst_port)1120            stat_after = self.dut_statistics.get_drv_counters()1121            k = self.queue_name.format(action_queue)1122            assert stat_after[k] - stat_before[k] == num_packets * filters_count1123        finally:1124            self.clear_filters()1125    def test_rxflow_tcp4_srcport_drop(self):1126        """1127        @description: Test rxflow l3l4 srcport DROP filter1128        @steps:1129        1. Configure filter.1130        2. Send traffic.1131        3. Check traffic dropped.1132        @result: Check passed.1133        @duration: 5 sec.1134        """1135        try:1136            action_queue = -11137            num_packets = 1001138            src_ip = "192.168.100.1"1139            dst_ip = "192.168.200.1"1140            src_port = 102441141            dst_port = 102431142            self.add_filter(flow_type='tcp4', src_port=src_port, action=action_queue)1143            stat_before = self.dut_statistics.get_drv_counters()1144            self.send_packets(ipv4=True, tcp=True, count=num_packets,1145                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1146                              src_port=src_port, dst_port=(dst_port, 2),1147                              backgroud_traffic=False)1148            stat_after = self.dut_statistics.get_drv_counters()1149            for q in range(self.rings):1150                k = self.queue_name.format(q)1151                assert stat_after[k] - stat_before[k] < 201152        finally:1153            self.clear_filters()1154    def test_rxflow_tcp6_src_port(self):1155        """1156        @description: Test rxflow l3l4 tcp6 src port filter1157        @steps:1158        1. Configure filter.1159        2. Send traffic.1160        3. Check traffic recived in target queue.1161        @result: Check passed.1162        @duration: 5 sec.1163        """1164        try:1165            action_queue = 11166            num_packets = 1001167            src_ip = "2001:db8:0001::1"1168            dst_ip = "2001:db8:0002::1"1169            src_port = 65651170            dst_port = 10241171            self.add_filter(flow_type='tcp6', src_port=src_port)1172            stat_before = self.dut_statistics.get_drv_counters()1173            # send tcp61174            self.send_packets(ipv6=True, tcp=True, count=num_packets,1175                              src_ip=src_ip, dst_ip=dst_ip,1176                              src_port=(src_port, 4), dst_port=dst_port)1177            # send udp6 (background traffic)1178            self.send_packets(ipv6=True, udp=True, count=num_packets,1179                              src_ip=src_ip, dst_ip=dst_ip,1180                              src_port=src_port, dst_port=dst_port)1181            stat_after = self.dut_statistics.get_drv_counters()1182            k = self.queue_name.format(action_queue)1183            assert stat_after[k] - stat_before[k] == num_packets1184        finally:1185            self.clear_filters()1186    def test_rxflow_tcp6_dst_port(self):1187        """1188        @description: Test rxflow l3l4 tcp6 dst port filter1189        @steps:1190        1. Configure filter.1191        2. Send traffic.1192        3. Check traffic recived in target queue.1193        @result: Check passed.1194        @duration: 5 sec.1195        """1196        try:1197            action_queue = 11198            num_packets = 1001199            src_ip = "2001:db8:0001::1"1200            dst_ip = "2001:db8:0002::1"1201            src_port = 65651202            dst_port = 10241203            self.add_filter(flow_type='tcp6', dst_port=dst_port)1204            stat_before = self.dut_statistics.get_drv_counters()1205            # send tcp61206            self.send_packets(ipv6=True, tcp=True, count=num_packets,1207                              src_ip=src_ip, dst_ip=dst_ip,1208                              src_port=src_port, dst_port=(dst_port, 4))1209            # send udp6 (background traffic)1210            self.send_packets(ipv6=True, udp=True, count=num_packets,1211                              src_ip=src_ip, dst_ip=dst_ip,1212                              src_port=src_port, dst_port=dst_port)1213            stat_after = self.dut_statistics.get_drv_counters()1214            k = self.queue_name.format(action_queue)1215            assert stat_after[k] - stat_before[k] == num_packets1216        finally:1217            self.clear_filters()1218    def test_rxflow_tcp6_src_ip(self):1219        """1220        @description: Test rxflow l3l4 tcp6 src ip filter1221        @steps:1222        1. Configure filter.1223        2. Send traffic.1224        3. Check traffic recived in target queue.1225        @result: Check passed.1226        @duration: 5 sec.1227        """1228        try:1229            action_queue = 11230            num_packets = 1001231            src_ip = "2001:db8:0001::1"1232            dst_ip = "2001:db8:0002::1"1233            src_port = 65651234            dst_port = 10241235            self.add_filter(flow_type='tcp6', src_ip=src_ip)1236            stat_before = self.dut_statistics.get_drv_counters()1237            # send tcp61238            self.send_packets(ipv6=True, tcp=True, count=num_packets,1239                              src_ip=(src_ip, 4), dst_ip=dst_ip,1240                              src_port=src_port, dst_port=dst_port)1241            # send udp6 (background traffic)1242            self.send_packets(ipv6=True, udp=True, count=num_packets,1243                              src_ip=src_ip, dst_ip=dst_ip,1244                              src_port=src_port, dst_port=dst_port)1245            stat_after = self.dut_statistics.get_drv_counters()1246            k = self.queue_name.format(action_queue)1247            assert stat_after[k] - stat_before[k] == num_packets1248        finally:1249            self.clear_filters()1250    def test_rxflow_tcp6_dst_ip(self):1251        """1252        @description: Test rxflow l3l4 tcp6 dst ip filter1253        @steps:1254        1. Configure filter.1255        2. Send traffic.1256        3. Check traffic recived in target queue.1257        @result: Check passed.1258        @duration: 5 sec.1259        """1260        try:1261            action_queue = 11262            num_packets = 1001263            src_ip = "2001:db8:0001::1"1264            dst_ip = "2001:db8:0002::1"1265            src_port = 65651266            dst_port = 10241267            self.add_filter(flow_type='tcp6', dst_ip=dst_ip)1268            stat_before = self.dut_statistics.get_drv_counters()1269            # send tcp61270            self.send_packets(ipv6=True, tcp=True, count=num_packets,1271                              src_ip=src_ip, dst_ip=(dst_ip, 4),1272                              src_port=src_port, dst_port=dst_port)1273            # send udp6 (background traffic)1274            self.send_packets(ipv6=True, udp=True, count=num_packets,1275                              src_ip=src_ip, dst_ip=dst_ip,1276                              src_port=src_port, dst_port=dst_port)1277            stat_after = self.dut_statistics.get_drv_counters()1278            k = self.queue_name.format(action_queue)1279            assert stat_after[k] - stat_before[k] == num_packets1280        finally:1281            self.clear_filters()1282    def test_rxflow_tcp6_all_multiple_ips(self):1283        """1284        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1285        @steps:1286        1. Configure filter.1287        2. Send traffic.1288        3. Check traffic recived in target queue.1289        @result: Check passed.1290        @duration: 5 sec.1291        """1292        try:1293            action_queue = 11294            num_packets = 1001295            src_ip = "2001:db8:0001::1"1296            dst_ip = "2001:db8:0002::1"1297            src_ip_2 = "2001:db8:0001::2"1298            dst_ip_2 = "2001:db8:0002::2"1299            src_port = 102441300            dst_port = 102431301            if self.dut_fw_card == CARD_ANTIGUA:1302                filters_count = 31303                self.add_filter(flow_type='tcp6',1304                                src_ip=src_ip, dst_ip=(dst_ip, 3),1305                                src_port=src_port, dst_port=dst_port)1306            else:1307                filters_count = 21308                self.add_filter(flow_type='tcp6',1309                                src_ip=src_ip, dst_ip=dst_ip,1310                                src_port=src_port, dst_port=dst_port, loc=32)1311                self.add_filter(flow_type='tcp6',1312                                src_ip=src_ip_2, dst_ip=dst_ip_2,1313                                src_port=src_port, dst_port=dst_port, loc=36)1314            stat_before = self.dut_statistics.get_drv_counters()1315            # send tcp61316            self.send_packets(ipv6=True, tcp=True, count=num_packets,1317                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1318                              src_port=(src_port, 2), dst_port=(dst_port, 2))1319            # send udp6 (background traffic)1320            self.send_packets(ipv6=True, udp=True, count=num_packets,1321                              src_ip=src_ip, dst_ip=dst_ip,1322                              src_port=src_port, dst_port=dst_port)1323            stat_after = self.dut_statistics.get_drv_counters()1324            k = self.queue_name.format(action_queue)1325            assert stat_after[k] - stat_before[k] == num_packets * filters_count1326        finally:1327            self.clear_filters()1328    def test_rxflow_tcp6_all_multiple_ports(self):1329        """1330        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1331        @steps:1332        1. Configure filter.1333        2. Send traffic.1334        3. Check traffic recived in target queue.1335        @result: Check passed.1336        @duration: 5 sec.1337        """1338        try:1339            action_queue = 11340            num_packets = 1001341            src_ip = "2001:db8:0001::1"1342            dst_ip = "2001:db8:0002::1"1343            src_port = 102441344            dst_port = 102431345            src_port_2 = 102461346            dst_port_2 = 102451347            if self.dut_fw_card == CARD_ANTIGUA:1348                filters_count = 31349                self.add_filter(flow_type='tcp6',1350                                src_ip=src_ip, dst_ip=dst_ip,1351                                src_port=src_port, dst_port=(dst_port, 3))1352            else:1353                filters_count = 21354                self.add_filter(flow_type='tcp6',1355                                src_ip=src_ip, dst_ip=dst_ip,1356                                src_port=src_port, dst_port=dst_port, loc=32)1357                self.add_filter(flow_type='tcp6',1358                                src_ip=src_ip, dst_ip=dst_ip,1359                                src_port=src_port_2, dst_port=dst_port_2, loc=36)1360            stat_before = self.dut_statistics.get_drv_counters()1361            # send tcp61362            self.send_packets(ipv6=True, tcp=True, count=num_packets,1363                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1364                              src_port=(src_port, 4), dst_port=(dst_port, 4))1365            # send udp6 (background traffic)1366            self.send_packets(ipv6=True, udp=True, count=num_packets,1367                              src_ip=src_ip, dst_ip=dst_ip,1368                              src_port=src_port, dst_port=dst_port)1369            stat_after = self.dut_statistics.get_drv_counters()1370            k = self.queue_name.format(action_queue)1371            assert stat_after[k] - stat_before[k] == num_packets * filters_count1372        finally:1373            self.clear_filters()1374    def test_rxflow_tcp6_srcport_drop(self):1375        """1376        @description: Test rxflow l3l4 srcport DROP filter1377        @steps:1378        1. Configure filter.1379        2. Send traffic.1380        3. Check traffic dropped.1381        @result: Check passed.1382        @duration: 5 sec.1383        """1384        try:1385            action_queue = -11386            num_packets = 1001387            src_ip = "2001:db8:0001::1"1388            dst_ip = "2001:db8:0002::1"1389            src_port = 102441390            dst_port = 102431391            self.add_filter(flow_type='tcp6', src_port=src_port, action=action_queue)1392            stat_before = self.dut_statistics.get_drv_counters()1393            self.send_packets(ipv6=True, tcp=True, count=num_packets,1394                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1395                              src_port=src_port, dst_port=(dst_port, 2),1396                              backgroud_traffic=False)1397            stat_after = self.dut_statistics.get_drv_counters()1398            for q in range(self.rings):1399                k = self.queue_name.format(q)1400                assert stat_after[k] - stat_before[k] < 201401        finally:1402            self.clear_filters()1403    def test_rxflow_sctp4_src_port(self):1404        """1405        @description: Test rxflow l3l4 sctp4 src port filter1406        @steps:1407        1. Configure filter.1408        2. Send traffic.1409        3. Check traffic recived in target queue.1410        @result: Check passed.1411        @duration: 5 sec.1412        """1413        try:1414            action_queue = 11415            num_packets = 1001416            src_ip = "192.168.100.1"1417            dst_ip = '192.168.200.1'1418            src_port = 65651419            dst_port = 10241420            self.add_filter(flow_type='sctp4', src_port=src_port)1421            stat_before = self.dut_statistics.get_drv_counters()1422            # send sctp1423            self.send_packets(ipv4=True, sctp=True, count=num_packets,1424                              src_ip=src_ip, dst_ip=dst_ip,1425                              src_port=(src_port, 4), dst_port=dst_port)1426            # send tcp (background traffic)1427            self.send_packets(ipv4=True, tcp=True, count=num_packets,1428                              src_ip=src_ip, dst_ip=dst_ip,1429                              src_port=src_port, dst_port=dst_port)1430            stat_after = self.dut_statistics.get_drv_counters()1431            k = self.queue_name.format(action_queue)1432            assert stat_after[k] - stat_before[k] == num_packets1433        finally:1434            self.clear_filters()1435    def test_rxflow_sctp4_dst_port(self):1436        """1437        @description: Test rxflow l3l4 sctp4 dst port filter1438        @steps:1439        1. Configure filter.1440        2. Send traffic.1441        3. Check traffic recived in target queue.1442        @result: Check passed.1443        @duration: 5 sec.1444        """1445        try:1446            action_queue = 11447            num_packets = 1001448            src_ip = "192.168.100.1"1449            dst_ip = '192.168.200.1'1450            src_port = 65651451            dst_port = 10241452            self.add_filter(flow_type='sctp4', dst_port=dst_port)1453            stat_before = self.dut_statistics.get_drv_counters()1454            # send sctp1455            self.send_packets(ipv4=True, sctp=True, count=num_packets,1456                              src_ip=src_ip, dst_ip=dst_ip,1457                              src_port=src_port, dst_port=(dst_port, 4))1458            # send tcp (background traffic)1459            self.send_packets(ipv4=True, tcp=True, count=num_packets,1460                              src_ip=src_ip, dst_ip=dst_ip,1461                              src_port=src_port, dst_port=dst_port)1462            stat_after = self.dut_statistics.get_drv_counters()1463            k = self.queue_name.format(action_queue)1464            assert stat_after[k] - stat_before[k] == num_packets1465        finally:1466            self.clear_filters()1467    def test_rxflow_sctp4_src_ip(self):1468        """1469        @description: Test rxflow l3l4 sctp4 src ip filter1470        @steps:1471        1. Configure filter.1472        2. Send traffic.1473        3. Check traffic recived in target queue.1474        @result: Check passed.1475        @duration: 5 sec.1476        """1477        try:1478            action_queue = 11479            num_packets = 1001480            src_ip = "192.168.100.1"1481            dst_ip = '192.168.200.1'1482            src_port = 65651483            dst_port = 10241484            self.add_filter(flow_type='sctp4', src_ip=src_ip)1485            stat_before = self.dut_statistics.get_drv_counters()1486            # send sctp1487            self.send_packets(ipv4=True, sctp=True, count=num_packets,1488                              src_ip=(src_ip, 4), dst_ip=dst_ip,1489                              src_port=src_port, dst_port=dst_port)1490            # send tcp (background traffic)1491            self.send_packets(ipv4=True, tcp=True, count=num_packets,1492                              src_ip=src_ip, dst_ip=dst_ip,1493                              src_port=src_port, dst_port=dst_port)1494            stat_after = self.dut_statistics.get_drv_counters()1495            k = self.queue_name.format(action_queue)1496            assert stat_after[k] - stat_before[k] == num_packets1497        finally:1498            self.clear_filters()1499    def test_rxflow_sctp4_dst_ip(self):1500        """1501        @description: Test rxflow l3l4 sctp4 dst ip filter1502        @steps:1503        1. Configure filter.1504        2. Send traffic.1505        3. Check traffic recived in target queue.1506        @result: Check passed.1507        @duration: 5 sec.1508        """1509        try:1510            action_queue = 11511            num_packets = 1001512            src_ip = "192.168.100.1"1513            dst_ip = '192.168.200.1'1514            src_port = 65651515            dst_port = 10241516            self.add_filter(flow_type='sctp4', dst_ip=dst_ip)1517            stat_before = self.dut_statistics.get_drv_counters()1518            # send sctp1519            self.send_packets(ipv4=True, sctp=True, count=num_packets,1520                              src_ip=src_ip, dst_ip=(dst_ip, 4),1521                              src_port=src_port, dst_port=dst_port)1522            # send tcp (background traffic)1523            self.send_packets(ipv4=True, tcp=True, count=num_packets,1524                              src_ip=src_ip, dst_ip=dst_ip,1525                              src_port=src_port, dst_port=dst_port)1526            stat_after = self.dut_statistics.get_drv_counters()1527            k = self.queue_name.format(action_queue)1528            assert stat_after[k] - stat_before[k] == num_packets1529        finally:1530            self.clear_filters()1531    def test_rxflow_sctp4_all_multiple_ips(self):1532        """1533        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1534        @steps:1535        1. Configure filter.1536        2. Send traffic.1537        3. Check traffic recived in target queue.1538        @result: Check passed.1539        @duration: 5 sec.1540        """1541        try:1542            action_queue = 11543            num_packets = 1001544            src_ip = "192.168.100.1"1545            dst_ip = "192.168.200.1"1546            src_port = 102441547            dst_port = 102431548            if self.dut_fw_card == CARD_ANTIGUA:1549                filters_count = 31550                self.add_filter(flow_type='sctp4',1551                                src_ip=(src_ip, 1), dst_ip=(dst_ip, 3),1552                                src_port=src_port, dst_port=dst_port)1553            else:1554                filters_count = 41555                self.add_filter(flow_type='sctp4',1556                                src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1557                                src_port=src_port, dst_port=dst_port)1558            stat_before = self.dut_statistics.get_drv_counters()1559            # send sctp1560            self.send_packets(ipv4=True, sctp=True, count=num_packets,1561                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1562                              src_port=(src_port, 2), dst_port=(dst_port, 2))1563            # send tcp (background traffic)1564            self.send_packets(ipv4=True, tcp=True, count=num_packets,1565                              src_ip=src_ip, dst_ip=dst_ip,1566                              src_port=src_port, dst_port=dst_port)1567            stat_after = self.dut_statistics.get_drv_counters()1568            k = self.queue_name.format(action_queue)1569            assert stat_after[k] - stat_before[k] == num_packets * filters_count1570        finally:1571            self.clear_filters()1572    def test_rxflow_sctp4_all_multiple_ports(self):1573        """1574        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1575        @steps:1576        1. Configure filter.1577        2. Send traffic.1578        3. Check traffic recived in target queue.1579        @result: Check passed.1580        @duration: 5 sec.1581        """1582        try:1583            action_queue = 11584            num_packets = 1001585            src_ip = "192.168.100.1"1586            dst_ip = "192.168.200.1"1587            src_port = 102441588            dst_port = 102431589            if self.dut_fw_card == CARD_ANTIGUA:1590                filters_count = 31591                self.add_filter(flow_type='sctp4',1592                                src_ip=src_ip, dst_ip=dst_ip,1593                                src_port=src_port, dst_port=(dst_port, 3))1594            else:1595                filters_count = 41596                self.add_filter(flow_type='sctp4',1597                                src_ip=src_ip, dst_ip=dst_ip,1598                                src_port=(src_port, 2), dst_port=(dst_port, 2))1599            stat_before = self.dut_statistics.get_drv_counters()1600            # send sctp1601            self.send_packets(ipv4=True, sctp=True, count=num_packets,1602                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1603                              src_port=(src_port, 4), dst_port=(dst_port, 4))1604            # send tcp (background traffic)1605            self.send_packets(ipv4=True, tcp=True, count=num_packets,1606                              src_ip=src_ip, dst_ip=dst_ip,1607                              src_port=src_port, dst_port=dst_port)1608            stat_after = self.dut_statistics.get_drv_counters()1609            k = self.queue_name.format(action_queue)1610            assert stat_after[k] - stat_before[k] == num_packets * filters_count1611        finally:1612            self.clear_filters()1613    def test_rxflow_sctp4_srcport_drop(self):1614        """1615        @description: Test rxflow l3l4 srcport DROP filter1616        @steps:1617        1. Configure filter.1618        2. Send traffic.1619        3. Check traffic dropped.1620        @result: Check passed.1621        @duration: 5 sec.1622        """1623        try:1624            action_queue = -11625            num_packets = 1001626            src_ip = "192.168.100.1"1627            dst_ip = "192.168.200.1"1628            src_port = 102441629            dst_port = 102431630            self.add_filter(flow_type='sctp4', src_port=src_port, action=action_queue)1631            stat_before = self.dut_statistics.get_drv_counters()1632            self.send_packets(ipv4=True, sctp=True, count=num_packets,1633                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1634                              src_port=src_port, dst_port=(dst_port, 2),1635                              backgroud_traffic=False)1636            stat_after = self.dut_statistics.get_drv_counters()1637            for q in range(self.rings):1638                k = self.queue_name.format(q)1639                assert stat_after[k] - stat_before[k] < 201640        finally:1641            self.clear_filters()1642    def test_rxflow_sctp6_src_port(self):1643        """1644        @description: Test rxflow l3l4 sctp6 src port filter1645        @steps:1646        1. Configure filter.1647        2. Send traffic.1648        3. Check traffic recived in target queue.1649        @result: Check passed.1650        @duration: 5 sec.1651        """1652        try:1653            action_queue = 11654            num_packets = 1001655            src_ip = "2001:db8:0001::1"1656            dst_ip = "2001:db8:0002::1"1657            src_port = 65651658            dst_port = 10241659            self.add_filter(flow_type='sctp6', src_port=src_port)1660            stat_before = self.dut_statistics.get_drv_counters()1661            # send sctp61662            self.send_packets(ipv6=True, sctp=True, count=num_packets,1663                              src_ip=src_ip, dst_ip=dst_ip,1664                              src_port=(src_port, 4), dst_port=dst_port)1665            # send tcp6 (background traffic)1666            self.send_packets(ipv6=True, tcp=True, count=num_packets,1667                              src_ip=src_ip, dst_ip=dst_ip,1668                              src_port=src_port, dst_port=dst_port)1669            stat_after = self.dut_statistics.get_drv_counters()1670            k = self.queue_name.format(action_queue)1671            assert stat_after[k] - stat_before[k] == num_packets1672        finally:1673            self.clear_filters()1674    def test_rxflow_sctp6_dst_port(self):1675        """1676        @description: Test rxflow l3l4 sctp6 dst port filter1677        @steps:1678        1. Configure filter.1679        2. Send traffic.1680        3. Check traffic recived in target queue.1681        @result: Check passed.1682        @duration: 5 sec.1683        """1684        try:1685            action_queue = 11686            num_packets = 1001687            src_ip = "2001:db8:0001::1"1688            dst_ip = "2001:db8:0002::1"1689            src_port = 65651690            dst_port = 10241691            self.add_filter(flow_type='sctp6', dst_port=dst_port)1692            stat_before = self.dut_statistics.get_drv_counters()1693            # send sctp61694            self.send_packets(ipv6=True, sctp=True, count=num_packets,1695                              src_ip=src_ip, dst_ip=dst_ip,1696                              src_port=src_port, dst_port=(dst_port, 4))1697            # send tcp6 (background traffic)1698            self.send_packets(ipv6=True, tcp=True, count=num_packets,1699                              src_ip=src_ip, dst_ip=dst_ip,1700                              src_port=src_port, dst_port=dst_port)1701            stat_after = self.dut_statistics.get_drv_counters()1702            k = self.queue_name.format(action_queue)1703            assert stat_after[k] - stat_before[k] == num_packets1704        finally:1705            self.clear_filters()1706    def test_rxflow_sctp6_src_ip(self):1707        """1708        @description: Test rxflow l3l4 sctp6 src ip filter1709        @steps:1710        1. Configure filter.1711        2. Send traffic.1712        3. Check traffic recived in target queue.1713        @result: Check passed.1714        @duration: 5 sec.1715        """1716        try:1717            action_queue = 11718            num_packets = 1001719            src_ip = "2001:db8:0001::1"1720            dst_ip = "2001:db8:0002::1"1721            src_port = 65651722            dst_port = 10241723            self.add_filter(flow_type='sctp6', src_ip=src_ip)1724            stat_before = self.dut_statistics.get_drv_counters()1725            # send sctp61726            self.send_packets(ipv6=True, sctp=True, count=num_packets,1727                              src_ip=(src_ip, 4), dst_ip=dst_ip,1728                              src_port=src_port, dst_port=dst_port)1729            # send tcp6 (background traffic)1730            self.send_packets(ipv6=True, tcp=True, count=num_packets,1731                              src_ip=src_ip, dst_ip=dst_ip,1732                              src_port=src_port, dst_port=dst_port)1733            stat_after = self.dut_statistics.get_drv_counters()1734            k = self.queue_name.format(action_queue)1735            assert stat_after[k] - stat_before[k] == num_packets1736        finally:1737            self.clear_filters()1738    def test_rxflow_sctp6_dst_ip(self):1739        """1740        @description: Test rxflow l3l4 sctp6 dst ip filter1741        @steps:1742        1. Configure filter.1743        2. Send traffic.1744        3. Check traffic recived in target queue.1745        @result: Check passed.1746        @duration: 5 sec.1747        """1748        try:1749            action_queue = 11750            num_packets = 1001751            src_ip = "2001:db8:0001::1"1752            dst_ip = "2001:db8:0002::1"1753            src_port = 65651754            dst_port = 10241755            self.add_filter(flow_type='sctp6', dst_ip=dst_ip)1756            stat_before = self.dut_statistics.get_drv_counters()1757            # send sctp61758            self.send_packets(ipv6=True, sctp=True, count=num_packets,1759                              src_ip=src_ip, dst_ip=(dst_ip, 4),1760                              src_port=src_port, dst_port=dst_port)1761            # send tcp6 (background traffic)1762            self.send_packets(ipv6=True, tcp=True, count=num_packets,1763                              src_ip=src_ip, dst_ip=dst_ip,1764                              src_port=src_port, dst_port=dst_port)1765            stat_after = self.dut_statistics.get_drv_counters()1766            k = self.queue_name.format(action_queue)1767            assert stat_after[k] - stat_before[k] == num_packets1768        finally:1769            self.clear_filters()1770    def test_rxflow_sctp6_all_multiple_ips(self):1771        """1772        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1773        @steps:1774        1. Configure filter.1775        2. Send traffic.1776        3. Check traffic recived in target queue.1777        @result: Check passed.1778        @duration: 5 sec.1779        """1780        try:1781            action_queue = 11782            num_packets = 1001783            src_ip = "2001:db8:0001::1"1784            dst_ip = "2001:db8:0002::1"1785            src_ip_2 = "2001:db8:0001::2"1786            dst_ip_2 = "2001:db8:0002::2"1787            src_port = 102441788            dst_port = 102431789            if self.dut_fw_card == CARD_ANTIGUA:1790                filters_count = 31791                self.add_filter(flow_type='sctp6',1792                                src_ip=src_ip, dst_ip=(dst_ip, 3),1793                                src_port=src_port, dst_port=dst_port)1794            else:1795                filters_count = 21796                self.add_filter(flow_type='sctp6',1797                                src_ip=src_ip, dst_ip=dst_ip,1798                                src_port=src_port, dst_port=dst_port, loc=32)1799                self.add_filter(flow_type='sctp6',1800                                src_ip=src_ip_2, dst_ip=dst_ip_2,1801                                src_port=src_port, dst_port=dst_port, loc=36)1802            stat_before = self.dut_statistics.get_drv_counters()1803            # send sctp61804            self.send_packets(ipv6=True, sctp=True, count=num_packets,1805                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1806                              src_port=(src_port, 2), dst_port=(dst_port, 2))1807            # send tcp6 (background traffic)1808            self.send_packets(ipv6=True, tcp=True, count=num_packets,1809                              src_ip=src_ip, dst_ip=dst_ip,1810                              src_port=src_port, dst_port=dst_port)1811            stat_after = self.dut_statistics.get_drv_counters()1812            k = self.queue_name.format(action_queue)1813            assert stat_after[k] - stat_before[k] == num_packets * filters_count1814        finally:1815            self.clear_filters()1816    def test_rxflow_sctp6_all_multiple_ports(self):1817        """1818        @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1819        @steps:1820        1. Configure filter.1821        2. Send traffic.1822        3. Check traffic recived in target queue.1823        @result: Check passed.1824        @duration: 5 sec.1825        """1826        try:1827            action_queue = 11828            num_packets = 1001829            src_ip = "2001:db8:0001::1"1830            dst_ip = "2001:db8:0002::1"1831            src_port = 102441832            dst_port = 102431833            src_port_2 = 102461834            dst_port_2 = 102451835            if self.dut_fw_card == CARD_ANTIGUA:1836                filters_count = 31837                self.add_filter(flow_type='sctp6',1838                                src_ip=src_ip, dst_ip=dst_ip,1839                                src_port=src_port, dst_port=(dst_port, 3))1840            else:1841                filters_count = 21842                self.add_filter(flow_type='sctp6',1843                                src_ip=src_ip, dst_ip=dst_ip,1844                                src_port=src_port, dst_port=dst_port, loc=32)1845                self.add_filter(flow_type='sctp6',1846                                src_ip=src_ip, dst_ip=dst_ip,1847                                src_port=src_port_2, dst_port=dst_port_2, loc=36)1848            stat_before = self.dut_statistics.get_drv_counters()1849            # send sctp61850            self.send_packets(ipv6=True, sctp=True, count=num_packets,1851                              src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1852                              src_port=(src_port, 4), dst_port=(dst_port, 4))1853            # send tcp6 (background traffic)1854            self.send_packets(ipv6=True, tcp=True, count=num_packets,1855                              src_ip=src_ip, dst_ip=dst_ip,1856                              src_port=src_port, dst_port=dst_port)1857            stat_after = self.dut_statistics.get_drv_counters()1858            k = self.queue_name.format(action_queue)1859            assert stat_after[k] - stat_before[k] == num_packets * filters_count1860        finally:1861            self.clear_filters()1862    def test_rxflow_sctp6_srcport_drop(self):1863        """1864        @description: Test rxflow l3l4 srcport DROP filter1865        @steps:1866        1. Configure filter.1867        2. Send traffic.1868        3. Check traffic dropped.1869        @result: Check passed.1870        @duration: 5 sec.1871        """1872        try:1873            action_queue = -11874            num_packets = 1001875            src_ip = "2001:db8:0001::1"1876            dst_ip = "2001:db8:0002::1"1877            src_port = 102441878            dst_port = 102431879            self.add_filter(flow_type='sctp6', src_port=src_port, action=action_queue)1880            stat_before = self.dut_statistics.get_drv_counters()1881            self.send_packets(ipv6=True, sctp=True, count=num_packets,1882                              src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1883                              src_port=src_port, dst_port=(dst_port, 2),1884                              backgroud_traffic=False)1885            stat_after = self.dut_statistics.get_drv_counters()1886            for q in range(self.rings):1887                k = self.queue_name.format(q)1888                assert stat_after[k] - stat_before[k] < 201889        finally:1890            self.clear_filters()1891if __name__ == "__main__":...funcs.py
Source:funcs.py  
...13    if choosen_timeout is None:14        return sniff(count=choosen_count, lfilter=filter_packets)15    else:16        return sniff(count=choosen_count, lfilter=filter_packets, timeout=choosen_timeout)17def send_packets(data, index=None):18    request_packet = IP(dst=SERVER_IP)/ICMP(type="echo-request")19    if index is None:  # If we are sending an ACK packet and not a file-data one20        request_packet = request_packet/data21    else:22        data = str(index).encode() + b'|' + data23        request_packet = request_packet/data24    send(request_packet, verbose=False)25def get_packet_index():26    packets = sniff_packets(1)27    packet = packets[0]28    return int(packet[Raw].load)29def get_ack(packets_window):30    packet_index = get_packet_index()31    while packet_index != WINDOW_SIZE:32        # Re-send missing packets to the server33        for missing_packet in range(packet_index, WINDOW_SIZE):34            send_packets(packets_window[missing_packet], missing_packet)35        packet_index = get_packet_index()36def get_final_ack(packets_window):37    """This method is used to verify that the last 8 or less packets were recieved"""38    packet_index = get_packet_index()39    while packet_index != len(packets_window):40        for missing_packet in range(packet_index, len(packets_window)):41            send_packets(packets_window[missing_packet], missing_packet)42        packet_index = get_packet_index()43def read_in_chunks(file_object):44    """Lazy function (generator) to read a file piece by piece."""45    while True:46        data = file_object.read(CHUNK_SIZE)47        if not data:48            break49        yield data50def send_file(file_path):51    with open(file_path, 'rb') as f:52        packets_window = []53        for piece in read_in_chunks(f):54            if len(packets_window) == WINDOW_SIZE:55                get_ack(packets_window)56                packets_window = []57            # This if statement is made to test the message-ACK algorithm. It is not actually needed.58            if random.randrange(3, 10) != 5:59                send_packets(piece, len(packets_window))60            packets_window.append(piece)61        get_final_ack(packets_window)62    print('All packets have been sent. Shutting down.')63def send_syn(file_name, packets_amount):64    """This function simulates a part of TCP's three-way-handshake by sending a SYN message with arguments and waiting for an ack packet in return."""65    send_packets(f'{file_name}|{packets_amount}')66    packets = sniff_packets(1, 10)67    if len(packets) == 1 and packets[0][Raw].load == b'ACK':68        print('Got ACK, starting to send the file.')69        return70    raise Exception('Server is off.')71def launch(file_path, file_name):72    packets_amount = math.ceil(os.path.getsize(file_path) / CHUNK_SIZE)73    send_syn(file_name, packets_amount)...app.py
Source:app.py  
1# Handle requests to and from facebook API and heroku server2import os3import json4import requests5from flask import Flask, request6from query_sort import clean_text, sort_query7app = Flask(__name__)8POST_PARAMS = {9    "access_token": os.environ["PAGE_ACCESS_TOKEN"]10}11POST_HEADERS = {12    "Content-Type": "application/json"13}14@app.route('/', methods=['GET'])15def verify():16    if request.args.get("hub.mode") == "subscribe" and request.args.get(17            "hub.challenge"):18        if not request.args.get(19                "hub.verify_token") == os.environ["VERIFY_TOKEN"]:20            return "Verification token mismatch", 40321        return request.args["hub.challenge"], 20022    return "Err: 404, you came to the wrong place <br> \23            Visit <a href=https://www.facebook.com/ChatbotBOB>BOB's \24            facebook page</a>", 20025@app.route('/', methods=['POST'])26def webhook():27    data = request.get_json()28    if data["object"] == "page":29        for entry in data["entry"]:30            for msg_event in entry["messaging"]:31                sender_id = msg_event["sender"]["id"]32                if isinstance(msg_event, list):33                    msg_event = msg_event[0]34                if "postback" in msg_event:35                    msg_text = msg_event["postback"]["payload"]36                elif "text" in msg_event["message"]:37                    msg_text = msg_event["message"]["text"]38                else:39                    return "ok", 20040                msg_text = clean_text(msg_text)41                send_packets = sort_query(msg_text, sender_id)42                send_message(send_packets)43    return "ok", 20044def send_message(send_packets):45    if send_packets:46        for packet in send_packets:47            json_packet = json.dumps(packet)48            requests.post(49                "https://graph.facebook.com/v2.6/me/messages",50                params=POST_PARAMS,51                headers=POST_HEADERS,52                data=json_packet)53if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
