How to use send_packets method in lisa

Best Python code snippet using lisa_python

drv_ntulpe_filters.py

Source:drv_ntulpe_filters.py Github

copy

Full Screen

...106 for loc in self.added_l3l4_loc:107 Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, loc)).run()108 self.added_l3l4_loc = []109 log.debug('added_l3l4_loc: {}'.format(self.added_l3l4_loc))110 def send_packets(self, ipv4=False, ipv6=False, udp=False, tcp=False, sctp=False,111 src_ip=None, dst_ip=None, src_port=None, dst_port=None,112 count=100, backgroud_traffic=True):113 src_ip_range = self.get_ip_range(src_ip)114 dst_ip_range = self.get_ip_range(dst_ip)115 src_port_range = self.get_port_range(src_port)116 dst_port_range = self.get_port_range(dst_port)117 if backgroud_traffic:118 self.ping(to_host=self.DUT_IPV4_ADDR, from_host=self.LKP_IPV4_ADDR, number=20)119 for src_ip, dst_ip, src_port, dst_port in itertools.product(120 src_ip_range, dst_ip_range, src_port_range, dst_port_range):121 pkt = Ether(dst=self.dut_mac)122 if ipv4:123 pkt = pkt / IP(src=src_ip, dst=dst_ip)124 elif ipv6:125 pkt = pkt / IPv6(src=src_ip, dst=dst_ip)126 if udp:127 pkt = pkt / UDP(sport=src_port, dport=dst_port)128 elif tcp:129 pkt = pkt / TCP(sport=src_port, dport=dst_port)130 elif sctp:131 pkt = pkt / SCTP(sport=src_port, dport=dst_port)132 log.info('Prepared packet: {}'.format(pkt.summary()))133 lkp_aqsendp = Aqsendp(134 packet=pkt, count=count,135 host=self.lkp_hostname, iface=self.lkp_iface136 )137 lkp_aqsendp.run()138 def test_rxflow_vlan(self):139 """140 @description: Test rxflow vlan 1 filter141 @steps:142 1. Configure filter.143 2. Send traffic.144 3. Check traffic recived in target queue.145 @result: Check passed.146 @duration: 5 sec.147 """148 try:149 def create_vlan_iface(vlan):150 self.dut_ifconfig.create_vlan_iface(vlan)151 self.dut_ifconfig.set_link_state(LINK_STATE_UP, vlan_id=vlan)152 self.dut_ifconfig.wait_link_up(vlan_id=vlan)153 action_queue = 1154 num_packets = 100155 vlan_id_1 = 1156 vlan_id_2 = 2157 src_ip = "192.168.200.201"158 dst_ip = "192.168.200.200"159 create_vlan_iface(vlan_id_1)160 create_vlan_iface(vlan_id_2)161 set_filter_cmd = "ethtool -N {} flow-type ether vlan {} m 0xF000 action {} loc {}".format(162 self.dut_iface, vlan_id_1, action_queue, self.VLAN_LOCATION + 1)163 assert Command(cmd=set_filter_cmd).run()["returncode"] == 0164 Command(cmd='sudo ethtool -n {}'.format(self.dut_iface)).run()165 stat_before = self.dut_statistics.get_drv_counters()166 for vlan in range(1, 12):167 pkt = Ether(dst=self.dut_mac) / Dot1Q(vlan=vlan) / IP(dst=dst_ip, src=src_ip)168 lkp_aqsendp = Aqsendp(169 packet=pkt, count=num_packets,170 host=self.lkp_hostname, iface=self.lkp_iface)171 lkp_aqsendp.run()172 stat_after = self.dut_statistics.get_drv_counters()173 k = self.queue_name.format(action_queue)174 assert stat_after[k] - stat_before[k] == num_packets175 finally:176 self.dut_ifconfig.delete_vlan_iface(vlan_id_1)177 self.dut_ifconfig.delete_vlan_iface(vlan_id_2)178 Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.VLAN_LOCATION)).run()179 def test_rxflow_ethertype(self):180 """181 @description: Test rxflow ethertype filter182 @steps:183 1. Configure filter.184 2. Send traffic.185 3. Check traffic recived in target queue.186 @result: Check passed.187 @duration: 5 sec.188 """189 try:190 action_queue = 1191 num_packets = 100192 ethertype = 0x0801193 src_udp_port = 6565194 dst_udp_port = 99195 set_filter_cmd = "ethtool -N {} flow-type ether proto {} action {} loc {}".format(196 self.dut_iface, ethertype, action_queue, self.ETHERTYPE_LOCATION)197 assert Command(cmd=set_filter_cmd).run()["returncode"] == 0198 Command(cmd='sudo ethtool -n {}'.format(self.dut_iface)).run()199 pkt = Ether(dst=self.dut_mac, type=ethertype) / IP() / UDP(sport=src_udp_port, dport=dst_udp_port)200 lkp_aqsendp = Aqsendp(201 packet=pkt, count=num_packets,202 host=self.lkp_hostname, iface=self.lkp_iface203 )204 stat_before = self.dut_statistics.get_drv_counters()205 lkp_aqsendp.run()206 stat_after = self.dut_statistics.get_drv_counters()207 k = self.queue_name.format(action_queue)208 assert stat_after[k] - stat_before[k] == num_packets209 finally:210 Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.ETHERTYPE_LOCATION)).run()211 def test_rxflow_ethertype_user_prior(self):212 """213 @description: Test rxflow ethertype and user priority filter214 @steps:215 1. Configure filter.216 2. Send traffic.217 3. Check traffic recived in target queue.218 @result: Check passed.219 @duration: 5 sec.220 """221 if "forwarding" in self.dut_drv_version:222 pytest.skip()223 try:224 action_queue = 1225 num_packets = 100226 src_udp_port = 6565227 dst_udp_port = 99228 ethertype = 0x0801229 vlan = 1230 prio = 4231 vlan_type = (prio << 13) | vlan232 set_filter_cmd = "ethtool -N {} flow-type ether proto {} vlan {} m 0x1FFF action {} loc {}".format(233 self.dut_iface, ethertype, vlan_type, action_queue, self.ETHERTYPE_LOCATION)234 assert Command(cmd=set_filter_cmd).run()["returncode"] == 0235 sniffer = Tcpdump(host=self.dut_hostname, port=self.dut_port, timeout=60)236 sniffer.run_async()237 time.sleep(30)238 stat_before = self.dut_statistics.get_drv_counters()239 # send traffic for all priorities240 for p in range(0, 8):241 l2 = Ether(dst=self.dut_mac) / Dot1Q(vlan=vlan, prio=p, type=ethertype)242 pkt = l2 / IP() / UDP(sport=src_udp_port, dport=dst_udp_port)243 lkp_aqsendp = Aqsendp(244 packet=pkt, count=num_packets,245 host=self.lkp_hostname, iface=self.lkp_iface246 )247 lkp_aqsendp.run()248 stat_after = self.dut_statistics.get_drv_counters()249 dut_packets = sniffer.join(30)250 wrpcap("packets.pcap", dut_packets)251 shutil.copy("packets.pcap", self.test_log_dir)252 k = self.queue_name.format(action_queue)253 assert stat_after[k] - stat_before[k] == num_packets254 finally:255 Command(cmd="ethtool -N {} delete {}".format(self.dut_iface, self.ETHERTYPE_LOCATION)).run()256 def test_rxflow_ipv4_src_ip(self):257 """258 @description: Test rxflow l3l4 IPv6 srcip259 @steps:260 1. Configure filter.261 2. Send traffic.262 3. Check traffic recived in target queue.263 @result: Check passed.264 @duration: 5 sec.265 """266 try:267 action_queue = 1268 num_packets = 100269 src_ip = "169.254.1.1"270 self.add_filter(flow_type='ip4', src_ip=src_ip)271 stat_before = self.dut_statistics.get_drv_counters()272 self.send_packets(ipv4=True, src_ip=(src_ip, 4), count=num_packets)273 stat_after = self.dut_statistics.get_drv_counters()274 k = self.queue_name.format(action_queue)275 assert stat_after[k] - stat_before[k] == num_packets276 finally:277 self.clear_filters()278 def test_rxflow_ipv4_dst_ip(self):279 """280 @description: Test rxflow l3l4 IPv6 dstip281 @steps:282 1. Configure filter.283 2. Send traffic.284 3. Check traffic recived in target queue.285 @result: Check passed.286 @duration: 5 sec.287 """288 try:289 action_queue = 1290 num_packets = 100291 dst_ip = "169.254.1.1"292 self.add_filter(flow_type='ip4', dst_ip=dst_ip)293 stat_before = self.dut_statistics.get_drv_counters()294 self.send_packets(ipv4=True, dst_ip=(dst_ip, 4), count=num_packets)295 stat_after = self.dut_statistics.get_drv_counters()296 k = self.queue_name.format(action_queue)297 assert stat_after[k] - stat_before[k] == num_packets298 finally:299 self.clear_filters()300 def test_rxflow_ipv4_src_ip_dst_ip(self):301 """302 @description: Test rxflow l3l4 IPv6 dstip srcip303 @steps:304 1. Configure filter.305 2. Send traffic.306 3. Check traffic recived in target queue.307 @result: Check passed.308 @duration: 5 sec.309 """310 try:311 action_queue = 1312 num_packets = 100313 src_ip = "169.254.1.1"314 dst_ip = "169.254.2.1"315 self.add_filter(flow_type='ip4', src_ip=src_ip, dst_ip=dst_ip)316 stat_before = self.dut_statistics.get_drv_counters()317 self.send_packets(ipv4=True, src_ip=(src_ip, 5), dst_ip=(dst_ip, 4), count=num_packets)318 stat_after = self.dut_statistics.get_drv_counters()319 k = self.queue_name.format(action_queue)320 assert stat_after[k] - stat_before[k] == num_packets321 finally:322 self.clear_filters()323 def test_rxflow_ipv6_src_ip(self):324 """325 @description: Test rxflow l3l4 IPv6 srcip326 @steps:327 1. Configure filter.328 2. Send traffic.329 3. Check traffic recived in target queue.330 @result: Check passed.331 @duration: 5 sec.332 """333 try:334 action_queue = 1335 num_packets = 100336 src_ip = "2001:db8:dead::1"337 self.add_filter(flow_type='ip6', src_ip=src_ip)338 stat_before = self.dut_statistics.get_drv_counters()339 self.send_packets(ipv6=True, src_ip=(src_ip, 5), count=num_packets)340 stat_after = self.dut_statistics.get_drv_counters()341 k = self.queue_name.format(action_queue)342 assert stat_after[k] - stat_before[k] == num_packets343 finally:344 self.clear_filters()345 def test_rxflow_ipv6_dst_ip(self):346 """347 @description: Test rxflow l3l4 IPv6 dstip348 @steps:349 1. Configure filter.350 2. Send traffic.351 3. Check traffic recived in target queue.352 @result: Check passed.353 @duration: 5 sec.354 """355 try:356 action_queue = 1357 num_packets = 100358 dst_ip = "2001:db8:dead::1"359 self.add_filter(flow_type='ip6', dst_ip=dst_ip)360 stat_before = self.dut_statistics.get_drv_counters()361 self.send_packets(ipv6=True, dst_ip=(dst_ip, 5), count=num_packets)362 stat_after = self.dut_statistics.get_drv_counters()363 k = self.queue_name.format(action_queue)364 assert stat_after[k] - stat_before[k] == num_packets365 finally:366 self.clear_filters()367 def test_rxflow_ipv6_src_ip_dst_ip(self):368 """369 @description: Test rxflow l3l4 IPv6 dstip srcip370 @steps:371 1. Configure filter.372 2. Send traffic.373 3. Check traffic recived in target queue.374 @result: Check passed.375 @duration: 5 sec.376 """377 try:378 action_queue = 1379 num_packets = 100380 src_ip = "2001:db8:0001::1"381 dst_ip = "2001:db8:0002::1"382 src_ip_2 = "2001:db8:0001::2"383 dst_ip_2 = "2001:db8:0002::2"384 if self.dut_fw_card == CARD_ANTIGUA:385 filters_count = 3386 self.add_filter(flow_type='ip6', src_ip=src_ip, dst_ip=(dst_ip, 3))387 else:388 filters_count = 2389 self.add_filter(flow_type='ip6', src_ip=src_ip, dst_ip=dst_ip, loc=32)390 self.add_filter(flow_type='ip6', src_ip=src_ip_2, dst_ip=dst_ip_2, loc=36)391 stat_before = self.dut_statistics.get_drv_counters()392 self.send_packets(ipv6=True, src_ip=(src_ip, 5), dst_ip=(dst_ip, 5), count=num_packets)393 stat_after = self.dut_statistics.get_drv_counters()394 k = self.queue_name.format(action_queue)395 assert stat_after[k] - stat_before[k] == num_packets * filters_count396 finally:397 self.clear_filters()398 def test_rxflow_combine_ipv4_ipv6(self):399 """400 @description: Test rxflow l3l4 IPv6 dstip srcip401 @steps:402 1. Configure filter.403 2. Send traffic.404 3. Check traffic recived in target queue.405 @result: Check passed.406 @duration: 5 sec.407 """408 if self.dut_fw_card != CARD_ANTIGUA:409 pytest.skip()410 try:411 action_queue = 1412 num_packets = 100413 src_ip4 = "169.254.1.1"414 dst_ip4 = "169.254.2.1"415 src_ip6 = "2001:db8:0001::1"416 dst_ip6 = "2001:db8:0002::1"417 self.add_filter(flow_type='ip4', src_ip=src_ip4, dst_ip=dst_ip4)418 self.add_filter(flow_type='ip6', src_ip=src_ip6, dst_ip=dst_ip6)419 stat_before = self.dut_statistics.get_drv_counters()420 self.send_packets(ipv4=True, src_ip=(src_ip4, 2), dst_ip=(dst_ip4, 2), count=num_packets)421 self.send_packets(ipv6=True, src_ip=(src_ip6, 2), dst_ip=(dst_ip6, 2), count=num_packets)422 stat_after = self.dut_statistics.get_drv_counters()423 k = self.queue_name.format(action_queue)424 assert stat_after[k] - stat_before[k] == num_packets * 2425 finally:426 self.clear_filters()427 def test_rxflow_udp4_src_port(self):428 """429 @description: Test rxflow l3l4 udp4 src port filter430 @steps:431 1. Configure filter.432 2. Send traffic.433 3. Check traffic recived in target queue.434 @result: Check passed.435 @duration: 5 sec.436 """437 try:438 action_queue = 1439 num_packets = 100440 src_ip = "192.168.100.1"441 dst_ip = '192.168.200.1'442 src_port = 6565443 dst_port = 1024444 self.add_filter(flow_type='udp4', src_port=src_port)445 stat_before = self.dut_statistics.get_drv_counters()446 # send udp447 self.send_packets(ipv4=True, udp=True, count=num_packets,448 src_ip=src_ip, dst_ip=dst_ip,449 src_port=(src_port, 4), dst_port=dst_port)450 stat_after = self.dut_statistics.get_drv_counters()451 # send tcp (background traffic)452 self.send_packets(ipv4=True, tcp=True, count=num_packets,453 src_ip=src_ip, dst_ip=dst_ip,454 src_port=src_port, dst_port=dst_port)455 k = self.queue_name.format(action_queue)456 assert stat_after[k] - stat_before[k] == num_packets457 finally:458 self.clear_filters()459 def test_rxflow_udp4_dst_port(self):460 """461 @description: Test rxflow l3l4 udp4 dst port filter462 @steps:463 1. Configure filter.464 2. Send traffic.465 3. Check traffic recived in target queue.466 @result: Check passed.467 @duration: 5 sec.468 """469 try:470 action_queue = 1471 num_packets = 100472 src_ip = "192.168.100.1"473 dst_ip = '192.168.200.1'474 src_port = 6565475 dst_port = 1024476 self.add_filter(flow_type='udp4', dst_port=dst_port)477 stat_before = self.dut_statistics.get_drv_counters()478 # send udp479 self.send_packets(ipv4=True, udp=True, count=num_packets,480 src_ip=src_ip, dst_ip=dst_ip,481 src_port=src_port, dst_port=(dst_port, 4))482 stat_after = self.dut_statistics.get_drv_counters()483 # send tcp (background traffic)484 self.send_packets(ipv4=True, tcp=True, count=num_packets,485 src_ip=src_ip, dst_ip=dst_ip,486 src_port=src_port, dst_port=dst_port)487 k = self.queue_name.format(action_queue)488 assert stat_after[k] - stat_before[k] == num_packets489 finally:490 self.clear_filters()491 def test_rxflow_udp4_src_ip(self):492 """493 @description: Test rxflow l3l4 udp4 src ip filter494 @steps:495 1. Configure filter.496 2. Send traffic.497 3. Check traffic recived in target queue.498 @result: Check passed.499 @duration: 5 sec.500 """501 try:502 action_queue = 1503 num_packets = 100504 src_ip = "192.168.100.1"505 dst_ip = '192.168.200.1'506 src_port = 6565507 dst_port = 1024508 self.add_filter(flow_type='udp4', src_ip=src_ip)509 stat_before = self.dut_statistics.get_drv_counters()510 # send udp511 self.send_packets(ipv4=True, udp=True, count=num_packets,512 src_ip=(src_ip, 4), dst_ip=dst_ip,513 src_port=src_port, dst_port=dst_port)514 # send tcp (background traffic)515 self.send_packets(ipv4=True, tcp=True, count=num_packets,516 src_ip=src_ip, dst_ip=dst_ip,517 src_port=src_port, dst_port=dst_port)518 stat_after = self.dut_statistics.get_drv_counters()519 k = self.queue_name.format(action_queue)520 assert stat_after[k] - stat_before[k] == num_packets521 finally:522 self.clear_filters()523 def test_rxflow_udp4_dst_ip(self):524 """525 @description: Test rxflow l3l4 udp4 dst ip filter526 @steps:527 1. Configure filter.528 2. Send traffic.529 3. Check traffic recived in target queue.530 @result: Check passed.531 @duration: 5 sec.532 """533 try:534 action_queue = 1535 num_packets = 100536 src_ip = "192.168.100.1"537 dst_ip = '192.168.200.1'538 src_port = 6565539 dst_port = 1024540 self.add_filter(flow_type='udp4', dst_ip=dst_ip)541 stat_before = self.dut_statistics.get_drv_counters()542 # send udp543 self.send_packets(ipv4=True, udp=True, count=num_packets,544 src_ip=src_ip, dst_ip=(dst_ip, 4),545 src_port=src_port, dst_port=dst_port)546 # send tcp (background traffic)547 self.send_packets(ipv4=True, tcp=True, count=num_packets,548 src_ip=src_ip, dst_ip=dst_ip,549 src_port=src_port, dst_port=dst_port)550 stat_after = self.dut_statistics.get_drv_counters()551 k = self.queue_name.format(action_queue)552 assert stat_after[k] - stat_before[k] == num_packets553 finally:554 self.clear_filters()555 def test_rxflow_udp4_all_multiple_ips(self):556 """557 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters558 @steps:559 1. Configure filter.560 2. Send traffic.561 3. Check traffic recived in target queue.562 @result: Check passed.563 @duration: 5 sec.564 """565 try:566 action_queue = 1567 num_packets = 100568 src_ip = "192.168.100.1"569 dst_ip = "192.168.200.1"570 src_port = 10244571 dst_port = 10243572 if self.dut_fw_card == CARD_ANTIGUA:573 filters_count = 3574 self.add_filter(flow_type='udp4',575 src_ip=(src_ip, 1), dst_ip=(dst_ip, 3),576 src_port=src_port, dst_port=dst_port)577 else:578 filters_count = 4579 self.add_filter(flow_type='udp4',580 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),581 src_port=src_port, dst_port=dst_port)582 stat_before = self.dut_statistics.get_drv_counters()583 # send udp584 self.send_packets(ipv4=True, udp=True, count=num_packets,585 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),586 src_port=(src_port, 2), dst_port=(dst_port, 2))587 stat_after = self.dut_statistics.get_drv_counters()588 # send tcp (background traffic)589 self.send_packets(ipv4=True, tcp=True, count=num_packets,590 src_ip=src_ip, dst_ip=dst_ip,591 src_port=src_port, dst_port=dst_port)592 k = self.queue_name.format(action_queue)593 assert stat_after[k] - stat_before[k] == num_packets * filters_count594 finally:595 self.clear_filters()596 def test_rxflow_udp4_all_multiple_ports(self):597 """598 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters599 @steps:600 1. Configure filter.601 2. Send traffic.602 3. Check traffic recived in target queue.603 @result: Check passed.604 @duration: 5 sec.605 """606 try:607 action_queue = 1608 num_packets = 100609 src_ip = "192.168.100.1"610 dst_ip = "192.168.200.1"611 src_port = 10244612 dst_port = 10243613 if self.dut_fw_card == CARD_ANTIGUA:614 filters_count = 3615 self.add_filter(flow_type='udp4',616 src_ip=src_ip, dst_ip=dst_ip,617 src_port=src_port, dst_port=(dst_port, 3))618 else:619 filters_count = 4620 self.add_filter(flow_type='udp4',621 src_ip=src_ip, dst_ip=dst_ip,622 src_port=(src_port, 2), dst_port=(dst_port, 2))623 stat_before = self.dut_statistics.get_drv_counters()624 # send udp625 self.send_packets(ipv4=True, udp=True, count=num_packets,626 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),627 src_port=(src_port, 4), dst_port=(dst_port, 4))628 # send tcp (background traffic)629 self.send_packets(ipv4=True, tcp=True, count=num_packets,630 src_ip=src_ip, dst_ip=dst_ip,631 src_port=src_port, dst_port=dst_port)632 stat_after = self.dut_statistics.get_drv_counters()633 k = self.queue_name.format(action_queue)634 assert stat_after[k] - stat_before[k] == num_packets * filters_count635 finally:636 self.clear_filters()637 def test_rxflow_udp4_srcport_drop(self):638 """639 @description: Test rxflow l3l4 srcport DROP filter640 @steps:641 1. Configure filter.642 2. Send traffic.643 3. Check traffic dropped.644 @result: Check passed.645 @duration: 5 sec.646 """647 try:648 action_queue = -1649 num_packets = 100650 src_ip = "192.168.100.1"651 dst_ip = "192.168.200.1"652 src_port = 10244653 dst_port = 10243654 self.add_filter(flow_type='udp4', src_port=src_port, action=action_queue)655 stat_before = self.dut_statistics.get_drv_counters()656 self.send_packets(ipv4=True, udp=True, count=num_packets,657 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),658 src_port=src_port, dst_port=(dst_port, 2),659 backgroud_traffic=False)660 stat_after = self.dut_statistics.get_drv_counters()661 for q in range(self.rings):662 k = self.queue_name.format(q)663 assert stat_after[k] - stat_before[k] < 20664 finally:665 self.clear_filters()666 def test_rxflow_udp6_src_port(self):667 """668 @description: Test rxflow l3l4 udp6 src port filter669 @steps:670 1. Configure filter.671 2. Send traffic.672 3. Check traffic recived in target queue.673 @result: Check passed.674 @duration: 5 sec.675 """676 try:677 action_queue = 1678 num_packets = 100679 src_ip = "2001:db8:0001::1"680 dst_ip = "2001:db8:0002::1"681 src_port = 6565682 dst_port = 1024683 self.add_filter(flow_type='udp6', src_port=src_port)684 stat_before = self.dut_statistics.get_drv_counters()685 # send udp6686 self.send_packets(ipv6=True, udp=True, count=num_packets,687 src_ip=src_ip, dst_ip=dst_ip,688 src_port=(src_port, 4), dst_port=dst_port)689 # send tcp6 (background traffic)690 self.send_packets(ipv6=True, tcp=True, count=num_packets,691 src_ip=src_ip, dst_ip=dst_ip,692 src_port=src_port, dst_port=dst_port)693 stat_after = self.dut_statistics.get_drv_counters()694 k = self.queue_name.format(action_queue)695 assert stat_after[k] - stat_before[k] == num_packets696 finally:697 self.clear_filters()698 def test_rxflow_udp6_dst_port(self):699 """700 @description: Test rxflow l3l4 udp6 dst port filter701 @steps:702 1. Configure filter.703 2. Send traffic.704 3. Check traffic recived in target queue.705 @result: Check passed.706 @duration: 5 sec.707 """708 try:709 action_queue = 1710 num_packets = 100711 src_ip = "2001:db8:0001::1"712 dst_ip = "2001:db8:0002::1"713 src_port = 6565714 dst_port = 1024715 self.add_filter(flow_type='udp6', dst_port=dst_port)716 stat_before = self.dut_statistics.get_drv_counters()717 # send udp6718 self.send_packets(ipv6=True, udp=True, count=num_packets,719 src_ip=src_ip, dst_ip=dst_ip,720 src_port=src_port, dst_port=(dst_port, 4))721 # send tcp6 (background traffic)722 self.send_packets(ipv6=True, tcp=True, count=num_packets,723 src_ip=src_ip, dst_ip=dst_ip,724 src_port=src_port, dst_port=dst_port)725 stat_after = self.dut_statistics.get_drv_counters()726 k = self.queue_name.format(action_queue)727 assert stat_after[k] - stat_before[k] == num_packets728 finally:729 self.clear_filters()730 def test_rxflow_udp6_src_ip(self):731 """732 @description: Test rxflow l3l4 udp6 src ip filter733 @steps:734 1. Configure filter.735 2. Send traffic.736 3. Check traffic recived in target queue.737 @result: Check passed.738 @duration: 5 sec.739 """740 try:741 action_queue = 1742 num_packets = 100743 src_ip = "2001:db8:0001::1"744 dst_ip = "2001:db8:0002::1"745 src_port = 6565746 dst_port = 1024747 self.add_filter(flow_type='udp6', src_ip=src_ip)748 stat_before = self.dut_statistics.get_drv_counters()749 # send udp6750 self.send_packets(ipv6=True, udp=True, count=num_packets,751 src_ip=(src_ip, 4), dst_ip=dst_ip,752 src_port=src_port, dst_port=dst_port)753 # send tcp6 (background traffic)754 self.send_packets(ipv6=True, tcp=True, count=num_packets,755 src_ip=src_ip, dst_ip=dst_ip,756 src_port=src_port, dst_port=dst_port)757 stat_after = self.dut_statistics.get_drv_counters()758 k = self.queue_name.format(action_queue)759 assert stat_after[k] - stat_before[k] == num_packets760 finally:761 self.clear_filters()762 def test_rxflow_udp6_dst_ip(self):763 """764 @description: Test rxflow l3l4 udp6 dst ip filter765 @steps:766 1. Configure filter.767 2. Send traffic.768 3. Check traffic recived in target queue.769 @result: Check passed.770 @duration: 5 sec.771 """772 try:773 action_queue = 1774 num_packets = 100775 src_ip = "2001:db8:0001::1"776 dst_ip = "2001:db8:0002::1"777 src_port = 6565778 dst_port = 1024779 self.add_filter(flow_type='udp6', dst_ip=dst_ip)780 stat_before = self.dut_statistics.get_drv_counters()781 # send udp6782 self.send_packets(ipv6=True, udp=True, count=num_packets,783 src_ip=src_ip, dst_ip=(dst_ip, 4),784 src_port=src_port, dst_port=dst_port)785 # send tcp6 (background traffic)786 self.send_packets(ipv6=True, tcp=True, count=num_packets,787 src_ip=src_ip, dst_ip=dst_ip,788 src_port=src_port, dst_port=dst_port)789 stat_after = self.dut_statistics.get_drv_counters()790 k = self.queue_name.format(action_queue)791 assert stat_after[k] - stat_before[k] == num_packets792 finally:793 self.clear_filters()794 def test_rxflow_udp6_all_multiple_ips(self):795 """796 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters797 @steps:798 1. Configure filter.799 2. Send traffic.800 3. Check traffic recived in target queue.801 @result: Check passed.802 @duration: 5 sec.803 """804 try:805 action_queue = 1806 num_packets = 100807 src_ip = "2001:db8:0001::1"808 dst_ip = "2001:db8:0002::1"809 src_ip_2 = "2001:db8:0001::2"810 dst_ip_2 = "2001:db8:0002::2"811 src_port = 10244812 dst_port = 10243813 if self.dut_fw_card == CARD_ANTIGUA:814 filters_count = 3815 self.add_filter(flow_type='udp6',816 src_ip=src_ip, dst_ip=(dst_ip, 3),817 src_port=src_port, dst_port=dst_port)818 else:819 filters_count = 2820 self.add_filter(flow_type='udp6',821 src_ip=src_ip, dst_ip=dst_ip,822 src_port=src_port, dst_port=dst_port, loc=32)823 self.add_filter(flow_type='udp6',824 src_ip=src_ip_2, dst_ip=dst_ip_2,825 src_port=src_port, dst_port=dst_port, loc=36)826 stat_before = self.dut_statistics.get_drv_counters()827 # send udp6828 self.send_packets(ipv6=True, udp=True, count=num_packets,829 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),830 src_port=(src_port, 2), dst_port=(dst_port, 2))831 # send tcp6 (background traffic)832 self.send_packets(ipv6=True, tcp=True, count=num_packets,833 src_ip=src_ip, dst_ip=dst_ip,834 src_port=src_port, dst_port=dst_port)835 stat_after = self.dut_statistics.get_drv_counters()836 k = self.queue_name.format(action_queue)837 assert stat_after[k] - stat_before[k] == num_packets * filters_count838 finally:839 self.clear_filters()840 def test_rxflow_udp6_all_multiple_ports(self):841 """842 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters843 @steps:844 1. Configure filter.845 2. Send traffic.846 3. Check traffic recived in target queue.847 @result: Check passed.848 @duration: 5 sec.849 """850 try:851 action_queue = 1852 num_packets = 100853 src_ip = "2001:db8:0001::1"854 dst_ip = "2001:db8:0002::1"855 src_port = 10244856 dst_port = 10243857 src_port_2 = 10246858 dst_port_2 = 10245859 if self.dut_fw_card == CARD_ANTIGUA:860 filters_count = 3861 self.add_filter(flow_type='udp6',862 src_ip=src_ip, dst_ip=dst_ip,863 src_port=src_port, dst_port=(dst_port, 3))864 else:865 filters_count = 2866 self.add_filter(flow_type='udp6',867 src_ip=src_ip, dst_ip=dst_ip,868 src_port=src_port, dst_port=dst_port, loc=32)869 self.add_filter(flow_type='udp6',870 src_ip=src_ip, dst_ip=dst_ip,871 src_port=src_port_2, dst_port=dst_port_2, loc=36)872 stat_before = self.dut_statistics.get_drv_counters()873 # send udp6874 self.send_packets(ipv6=True, udp=True, count=num_packets,875 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),876 src_port=(src_port, 4), dst_port=(dst_port, 4))877 # send tcp6 (background traffic)878 self.send_packets(ipv6=True, tcp=True, count=num_packets,879 src_ip=src_ip, dst_ip=dst_ip,880 src_port=src_port, dst_port=dst_port)881 stat_after = self.dut_statistics.get_drv_counters()882 k = self.queue_name.format(action_queue)883 assert stat_after[k] - stat_before[k] == num_packets * filters_count884 finally:885 self.clear_filters()886 def test_rxflow_udp6_srcport_drop(self):887 """888 @description: Test rxflow l3l4 srcport DROP filter889 @steps:890 1. Configure filter.891 2. Send traffic.892 3. Check traffic dropped.893 @result: Check passed.894 @duration: 5 sec.895 """896 try:897 action_queue = -1898 num_packets = 100899 src_ip = "2001:db8:0001::1"900 dst_ip = "2001:db8:0002::1"901 src_port = 10244902 dst_port = 10243903 self.add_filter(flow_type='udp6', src_port=src_port, action=action_queue)904 stat_before = self.dut_statistics.get_drv_counters()905 self.send_packets(ipv6=True, udp=True, count=num_packets,906 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),907 src_port=src_port, dst_port=(dst_port, 2),908 backgroud_traffic=False)909 stat_after = self.dut_statistics.get_drv_counters()910 for q in range(self.rings):911 k = self.queue_name.format(q)912 assert stat_after[k] - stat_before[k] < 20913 finally:914 self.clear_filters()915 def test_rxflow_tcp4_src_port(self):916 """917 @description: Test rxflow l3l4 tcp4 src port filter918 @steps:919 1. Configure filter.920 2. Send traffic.921 3. Check traffic recived in target queue.922 @result: Check passed.923 @duration: 5 sec.924 """925 try:926 action_queue = 1927 num_packets = 100928 src_ip = "192.168.100.1"929 dst_ip = '192.168.200.1'930 src_port = 6565931 dst_port = 1024932 self.add_filter(flow_type='tcp4', src_port=src_port)933 stat_before = self.dut_statistics.get_drv_counters()934 # send tcp935 self.send_packets(ipv4=True, tcp=True, count=num_packets,936 src_ip=src_ip, dst_ip=dst_ip,937 src_port=(src_port, 4), dst_port=dst_port)938 # send udp (backgroud traffic)939 self.send_packets(ipv4=True, udp=True, count=num_packets,940 src_ip=src_ip, dst_ip=dst_ip,941 src_port=src_port, dst_port=dst_port)942 stat_after = self.dut_statistics.get_drv_counters()943 k = self.queue_name.format(action_queue)944 assert stat_after[k] - stat_before[k] == num_packets945 finally:946 self.clear_filters()947 def test_rxflow_tcp4_dst_port(self):948 """949 @description: Test rxflow l3l4 tcp4 dst port filter950 @steps:951 1. Configure filter.952 2. Send traffic.953 3. Check traffic recived in target queue.954 @result: Check passed.955 @duration: 5 sec.956 """957 try:958 action_queue = 1959 num_packets = 100960 src_ip = "192.168.100.1"961 dst_ip = '192.168.200.1'962 src_port = 6565963 dst_port = 1024964 self.add_filter(flow_type='tcp4', dst_port=dst_port)965 stat_before = self.dut_statistics.get_drv_counters()966 # send tcp967 self.send_packets(ipv4=True, tcp=True, count=num_packets,968 src_ip=src_ip, dst_ip=dst_ip,969 src_port=src_port, dst_port=(dst_port, 4))970 # send udp (backgroud traffic)971 self.send_packets(ipv4=True, udp=True, count=num_packets,972 src_ip=src_ip, dst_ip=dst_ip,973 src_port=src_port, dst_port=dst_port)974 stat_after = self.dut_statistics.get_drv_counters()975 k = self.queue_name.format(action_queue)976 assert stat_after[k] - stat_before[k] == num_packets977 finally:978 self.clear_filters()979 def test_rxflow_tcp4_src_ip(self):980 """981 @description: Test rxflow l3l4 tcp4 src ip filter982 @steps:983 1. Configure filter.984 2. Send traffic.985 3. Check traffic recived in target queue.986 @result: Check passed.987 @duration: 5 sec.988 """989 try:990 action_queue = 1991 num_packets = 100992 src_ip = "192.168.100.1"993 dst_ip = '192.168.200.1'994 src_port = 6565995 dst_port = 1024996 self.add_filter(flow_type='tcp4', src_ip=src_ip)997 stat_before = self.dut_statistics.get_drv_counters()998 # send tcp999 self.send_packets(ipv4=True, tcp=True, count=num_packets,1000 src_ip=(src_ip, 4), dst_ip=dst_ip,1001 src_port=src_port, dst_port=dst_port)1002 # send udp (backgroud traffic)1003 self.send_packets(ipv4=True, udp=True, count=num_packets,1004 src_ip=src_ip, dst_ip=dst_ip,1005 src_port=src_port, dst_port=dst_port)1006 stat_after = self.dut_statistics.get_drv_counters()1007 k = self.queue_name.format(action_queue)1008 assert stat_after[k] - stat_before[k] == num_packets1009 finally:1010 self.clear_filters()1011 def test_rxflow_tcp4_dst_ip(self):1012 """1013 @description: Test rxflow l3l4 tcp4 dst ip filter1014 @steps:1015 1. Configure filter.1016 2. Send traffic.1017 3. Check traffic recived in target queue.1018 @result: Check passed.1019 @duration: 5 sec.1020 """1021 try:1022 action_queue = 11023 num_packets = 1001024 src_ip = "192.168.100.1"1025 dst_ip = '192.168.200.1'1026 src_port = 65651027 dst_port = 10241028 self.add_filter(flow_type='tcp4', dst_ip=dst_ip)1029 stat_before = self.dut_statistics.get_drv_counters()1030 # send tcp1031 self.send_packets(ipv4=True, tcp=True, count=num_packets,1032 src_ip=src_ip, dst_ip=(dst_ip, 4),1033 src_port=src_port, dst_port=dst_port)1034 # send udp (backgroud traffic)1035 self.send_packets(ipv4=True, udp=True, count=num_packets,1036 src_ip=src_ip, dst_ip=dst_ip,1037 src_port=src_port, dst_port=dst_port)1038 stat_after = self.dut_statistics.get_drv_counters()1039 k = self.queue_name.format(action_queue)1040 assert stat_after[k] - stat_before[k] == num_packets1041 finally:1042 self.clear_filters()1043 def test_rxflow_tcp4_all_multiple_ips(self):1044 """1045 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1046 @steps:1047 1. Configure filter.1048 2. Send traffic.1049 3. Check traffic recived in target queue.1050 @result: Check passed.1051 @duration: 5 sec.1052 """1053 try:1054 action_queue = 11055 num_packets = 1001056 src_ip = "192.168.100.1"1057 dst_ip = "192.168.200.1"1058 src_port = 102441059 dst_port = 102431060 if self.dut_fw_card == CARD_ANTIGUA:1061 filters_count = 31062 self.add_filter(flow_type='tcp4',1063 src_ip=src_ip, dst_ip=(dst_ip, 3),1064 src_port=src_port, dst_port=dst_port)1065 else:1066 filters_count = 41067 self.add_filter(flow_type='tcp4',1068 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1069 src_port=src_port, dst_port=dst_port)1070 stat_before = self.dut_statistics.get_drv_counters()1071 # send tcp1072 self.send_packets(ipv4=True, tcp=True, count=num_packets,1073 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1074 src_port=(src_port, 2), dst_port=(dst_port, 2))1075 # send udp (backgroud traffic)1076 self.send_packets(ipv4=True, udp=True, count=num_packets,1077 src_ip=src_ip, dst_ip=dst_ip,1078 src_port=src_port, dst_port=dst_port)1079 stat_after = self.dut_statistics.get_drv_counters()1080 k = self.queue_name.format(action_queue)1081 assert stat_after[k] - stat_before[k] == num_packets * filters_count1082 finally:1083 self.clear_filters()1084 def test_rxflow_tcp4_all_multiple_ports(self):1085 """1086 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1087 @steps:1088 1. Configure filter.1089 2. Send traffic.1090 3. Check traffic recived in target queue.1091 @result: Check passed.1092 @duration: 5 sec.1093 """1094 try:1095 action_queue = 11096 num_packets = 1001097 src_ip = "192.168.100.1"1098 dst_ip = "192.168.200.1"1099 src_port = 102441100 dst_port = 102431101 if self.dut_fw_card == CARD_ANTIGUA:1102 filters_count = 31103 self.add_filter(flow_type='tcp4',1104 src_ip=src_ip, dst_ip=dst_ip,1105 src_port=src_port, dst_port=(dst_port, 3))1106 else:1107 filters_count = 41108 self.add_filter(flow_type='tcp4',1109 src_ip=src_ip, dst_ip=dst_ip,1110 src_port=(src_port, 2), dst_port=(dst_port, 2))1111 stat_before = self.dut_statistics.get_drv_counters()1112 # send tcp1113 self.send_packets(ipv4=True, tcp=True, count=num_packets,1114 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1115 src_port=(src_port, 4), dst_port=(dst_port, 4))1116 # send udp (backgroud traffic)1117 self.send_packets(ipv4=True, udp=True, count=num_packets,1118 src_ip=src_ip, dst_ip=dst_ip,1119 src_port=src_port, dst_port=dst_port)1120 stat_after = self.dut_statistics.get_drv_counters()1121 k = self.queue_name.format(action_queue)1122 assert stat_after[k] - stat_before[k] == num_packets * filters_count1123 finally:1124 self.clear_filters()1125 def test_rxflow_tcp4_srcport_drop(self):1126 """1127 @description: Test rxflow l3l4 srcport DROP filter1128 @steps:1129 1. Configure filter.1130 2. Send traffic.1131 3. Check traffic dropped.1132 @result: Check passed.1133 @duration: 5 sec.1134 """1135 try:1136 action_queue = -11137 num_packets = 1001138 src_ip = "192.168.100.1"1139 dst_ip = "192.168.200.1"1140 src_port = 102441141 dst_port = 102431142 self.add_filter(flow_type='tcp4', src_port=src_port, action=action_queue)1143 stat_before = self.dut_statistics.get_drv_counters()1144 self.send_packets(ipv4=True, tcp=True, count=num_packets,1145 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1146 src_port=src_port, dst_port=(dst_port, 2),1147 backgroud_traffic=False)1148 stat_after = self.dut_statistics.get_drv_counters()1149 for q in range(self.rings):1150 k = self.queue_name.format(q)1151 assert stat_after[k] - stat_before[k] < 201152 finally:1153 self.clear_filters()1154 def test_rxflow_tcp6_src_port(self):1155 """1156 @description: Test rxflow l3l4 tcp6 src port filter1157 @steps:1158 1. Configure filter.1159 2. Send traffic.1160 3. Check traffic recived in target queue.1161 @result: Check passed.1162 @duration: 5 sec.1163 """1164 try:1165 action_queue = 11166 num_packets = 1001167 src_ip = "2001:db8:0001::1"1168 dst_ip = "2001:db8:0002::1"1169 src_port = 65651170 dst_port = 10241171 self.add_filter(flow_type='tcp6', src_port=src_port)1172 stat_before = self.dut_statistics.get_drv_counters()1173 # send tcp61174 self.send_packets(ipv6=True, tcp=True, count=num_packets,1175 src_ip=src_ip, dst_ip=dst_ip,1176 src_port=(src_port, 4), dst_port=dst_port)1177 # send udp6 (background traffic)1178 self.send_packets(ipv6=True, udp=True, count=num_packets,1179 src_ip=src_ip, dst_ip=dst_ip,1180 src_port=src_port, dst_port=dst_port)1181 stat_after = self.dut_statistics.get_drv_counters()1182 k = self.queue_name.format(action_queue)1183 assert stat_after[k] - stat_before[k] == num_packets1184 finally:1185 self.clear_filters()1186 def test_rxflow_tcp6_dst_port(self):1187 """1188 @description: Test rxflow l3l4 tcp6 dst port filter1189 @steps:1190 1. Configure filter.1191 2. Send traffic.1192 3. Check traffic recived in target queue.1193 @result: Check passed.1194 @duration: 5 sec.1195 """1196 try:1197 action_queue = 11198 num_packets = 1001199 src_ip = "2001:db8:0001::1"1200 dst_ip = "2001:db8:0002::1"1201 src_port = 65651202 dst_port = 10241203 self.add_filter(flow_type='tcp6', dst_port=dst_port)1204 stat_before = self.dut_statistics.get_drv_counters()1205 # send tcp61206 self.send_packets(ipv6=True, tcp=True, count=num_packets,1207 src_ip=src_ip, dst_ip=dst_ip,1208 src_port=src_port, dst_port=(dst_port, 4))1209 # send udp6 (background traffic)1210 self.send_packets(ipv6=True, udp=True, count=num_packets,1211 src_ip=src_ip, dst_ip=dst_ip,1212 src_port=src_port, dst_port=dst_port)1213 stat_after = self.dut_statistics.get_drv_counters()1214 k = self.queue_name.format(action_queue)1215 assert stat_after[k] - stat_before[k] == num_packets1216 finally:1217 self.clear_filters()1218 def test_rxflow_tcp6_src_ip(self):1219 """1220 @description: Test rxflow l3l4 tcp6 src ip filter1221 @steps:1222 1. Configure filter.1223 2. Send traffic.1224 3. Check traffic recived in target queue.1225 @result: Check passed.1226 @duration: 5 sec.1227 """1228 try:1229 action_queue = 11230 num_packets = 1001231 src_ip = "2001:db8:0001::1"1232 dst_ip = "2001:db8:0002::1"1233 src_port = 65651234 dst_port = 10241235 self.add_filter(flow_type='tcp6', src_ip=src_ip)1236 stat_before = self.dut_statistics.get_drv_counters()1237 # send tcp61238 self.send_packets(ipv6=True, tcp=True, count=num_packets,1239 src_ip=(src_ip, 4), dst_ip=dst_ip,1240 src_port=src_port, dst_port=dst_port)1241 # send udp6 (background traffic)1242 self.send_packets(ipv6=True, udp=True, count=num_packets,1243 src_ip=src_ip, dst_ip=dst_ip,1244 src_port=src_port, dst_port=dst_port)1245 stat_after = self.dut_statistics.get_drv_counters()1246 k = self.queue_name.format(action_queue)1247 assert stat_after[k] - stat_before[k] == num_packets1248 finally:1249 self.clear_filters()1250 def test_rxflow_tcp6_dst_ip(self):1251 """1252 @description: Test rxflow l3l4 tcp6 dst ip filter1253 @steps:1254 1. Configure filter.1255 2. Send traffic.1256 3. Check traffic recived in target queue.1257 @result: Check passed.1258 @duration: 5 sec.1259 """1260 try:1261 action_queue = 11262 num_packets = 1001263 src_ip = "2001:db8:0001::1"1264 dst_ip = "2001:db8:0002::1"1265 src_port = 65651266 dst_port = 10241267 self.add_filter(flow_type='tcp6', dst_ip=dst_ip)1268 stat_before = self.dut_statistics.get_drv_counters()1269 # send tcp61270 self.send_packets(ipv6=True, tcp=True, count=num_packets,1271 src_ip=src_ip, dst_ip=(dst_ip, 4),1272 src_port=src_port, dst_port=dst_port)1273 # send udp6 (background traffic)1274 self.send_packets(ipv6=True, udp=True, count=num_packets,1275 src_ip=src_ip, dst_ip=dst_ip,1276 src_port=src_port, dst_port=dst_port)1277 stat_after = self.dut_statistics.get_drv_counters()1278 k = self.queue_name.format(action_queue)1279 assert stat_after[k] - stat_before[k] == num_packets1280 finally:1281 self.clear_filters()1282 def test_rxflow_tcp6_all_multiple_ips(self):1283 """1284 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1285 @steps:1286 1. Configure filter.1287 2. Send traffic.1288 3. Check traffic recived in target queue.1289 @result: Check passed.1290 @duration: 5 sec.1291 """1292 try:1293 action_queue = 11294 num_packets = 1001295 src_ip = "2001:db8:0001::1"1296 dst_ip = "2001:db8:0002::1"1297 src_ip_2 = "2001:db8:0001::2"1298 dst_ip_2 = "2001:db8:0002::2"1299 src_port = 102441300 dst_port = 102431301 if self.dut_fw_card == CARD_ANTIGUA:1302 filters_count = 31303 self.add_filter(flow_type='tcp6',1304 src_ip=src_ip, dst_ip=(dst_ip, 3),1305 src_port=src_port, dst_port=dst_port)1306 else:1307 filters_count = 21308 self.add_filter(flow_type='tcp6',1309 src_ip=src_ip, dst_ip=dst_ip,1310 src_port=src_port, dst_port=dst_port, loc=32)1311 self.add_filter(flow_type='tcp6',1312 src_ip=src_ip_2, dst_ip=dst_ip_2,1313 src_port=src_port, dst_port=dst_port, loc=36)1314 stat_before = self.dut_statistics.get_drv_counters()1315 # send tcp61316 self.send_packets(ipv6=True, tcp=True, count=num_packets,1317 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1318 src_port=(src_port, 2), dst_port=(dst_port, 2))1319 # send udp6 (background traffic)1320 self.send_packets(ipv6=True, udp=True, count=num_packets,1321 src_ip=src_ip, dst_ip=dst_ip,1322 src_port=src_port, dst_port=dst_port)1323 stat_after = self.dut_statistics.get_drv_counters()1324 k = self.queue_name.format(action_queue)1325 assert stat_after[k] - stat_before[k] == num_packets * filters_count1326 finally:1327 self.clear_filters()1328 def test_rxflow_tcp6_all_multiple_ports(self):1329 """1330 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1331 @steps:1332 1. Configure filter.1333 2. Send traffic.1334 3. Check traffic recived in target queue.1335 @result: Check passed.1336 @duration: 5 sec.1337 """1338 try:1339 action_queue = 11340 num_packets = 1001341 src_ip = "2001:db8:0001::1"1342 dst_ip = "2001:db8:0002::1"1343 src_port = 102441344 dst_port = 102431345 src_port_2 = 102461346 dst_port_2 = 102451347 if self.dut_fw_card == CARD_ANTIGUA:1348 filters_count = 31349 self.add_filter(flow_type='tcp6',1350 src_ip=src_ip, dst_ip=dst_ip,1351 src_port=src_port, dst_port=(dst_port, 3))1352 else:1353 filters_count = 21354 self.add_filter(flow_type='tcp6',1355 src_ip=src_ip, dst_ip=dst_ip,1356 src_port=src_port, dst_port=dst_port, loc=32)1357 self.add_filter(flow_type='tcp6',1358 src_ip=src_ip, dst_ip=dst_ip,1359 src_port=src_port_2, dst_port=dst_port_2, loc=36)1360 stat_before = self.dut_statistics.get_drv_counters()1361 # send tcp61362 self.send_packets(ipv6=True, tcp=True, count=num_packets,1363 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1364 src_port=(src_port, 4), dst_port=(dst_port, 4))1365 # send udp6 (background traffic)1366 self.send_packets(ipv6=True, udp=True, count=num_packets,1367 src_ip=src_ip, dst_ip=dst_ip,1368 src_port=src_port, dst_port=dst_port)1369 stat_after = self.dut_statistics.get_drv_counters()1370 k = self.queue_name.format(action_queue)1371 assert stat_after[k] - stat_before[k] == num_packets * filters_count1372 finally:1373 self.clear_filters()1374 def test_rxflow_tcp6_srcport_drop(self):1375 """1376 @description: Test rxflow l3l4 srcport DROP filter1377 @steps:1378 1. Configure filter.1379 2. Send traffic.1380 3. Check traffic dropped.1381 @result: Check passed.1382 @duration: 5 sec.1383 """1384 try:1385 action_queue = -11386 num_packets = 1001387 src_ip = "2001:db8:0001::1"1388 dst_ip = "2001:db8:0002::1"1389 src_port = 102441390 dst_port = 102431391 self.add_filter(flow_type='tcp6', src_port=src_port, action=action_queue)1392 stat_before = self.dut_statistics.get_drv_counters()1393 self.send_packets(ipv6=True, tcp=True, count=num_packets,1394 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1395 src_port=src_port, dst_port=(dst_port, 2),1396 backgroud_traffic=False)1397 stat_after = self.dut_statistics.get_drv_counters()1398 for q in range(self.rings):1399 k = self.queue_name.format(q)1400 assert stat_after[k] - stat_before[k] < 201401 finally:1402 self.clear_filters()1403 def test_rxflow_sctp4_src_port(self):1404 """1405 @description: Test rxflow l3l4 sctp4 src port filter1406 @steps:1407 1. Configure filter.1408 2. Send traffic.1409 3. Check traffic recived in target queue.1410 @result: Check passed.1411 @duration: 5 sec.1412 """1413 try:1414 action_queue = 11415 num_packets = 1001416 src_ip = "192.168.100.1"1417 dst_ip = '192.168.200.1'1418 src_port = 65651419 dst_port = 10241420 self.add_filter(flow_type='sctp4', src_port=src_port)1421 stat_before = self.dut_statistics.get_drv_counters()1422 # send sctp1423 self.send_packets(ipv4=True, sctp=True, count=num_packets,1424 src_ip=src_ip, dst_ip=dst_ip,1425 src_port=(src_port, 4), dst_port=dst_port)1426 # send tcp (background traffic)1427 self.send_packets(ipv4=True, tcp=True, count=num_packets,1428 src_ip=src_ip, dst_ip=dst_ip,1429 src_port=src_port, dst_port=dst_port)1430 stat_after = self.dut_statistics.get_drv_counters()1431 k = self.queue_name.format(action_queue)1432 assert stat_after[k] - stat_before[k] == num_packets1433 finally:1434 self.clear_filters()1435 def test_rxflow_sctp4_dst_port(self):1436 """1437 @description: Test rxflow l3l4 sctp4 dst port filter1438 @steps:1439 1. Configure filter.1440 2. Send traffic.1441 3. Check traffic recived in target queue.1442 @result: Check passed.1443 @duration: 5 sec.1444 """1445 try:1446 action_queue = 11447 num_packets = 1001448 src_ip = "192.168.100.1"1449 dst_ip = '192.168.200.1'1450 src_port = 65651451 dst_port = 10241452 self.add_filter(flow_type='sctp4', dst_port=dst_port)1453 stat_before = self.dut_statistics.get_drv_counters()1454 # send sctp1455 self.send_packets(ipv4=True, sctp=True, count=num_packets,1456 src_ip=src_ip, dst_ip=dst_ip,1457 src_port=src_port, dst_port=(dst_port, 4))1458 # send tcp (background traffic)1459 self.send_packets(ipv4=True, tcp=True, count=num_packets,1460 src_ip=src_ip, dst_ip=dst_ip,1461 src_port=src_port, dst_port=dst_port)1462 stat_after = self.dut_statistics.get_drv_counters()1463 k = self.queue_name.format(action_queue)1464 assert stat_after[k] - stat_before[k] == num_packets1465 finally:1466 self.clear_filters()1467 def test_rxflow_sctp4_src_ip(self):1468 """1469 @description: Test rxflow l3l4 sctp4 src ip filter1470 @steps:1471 1. Configure filter.1472 2. Send traffic.1473 3. Check traffic recived in target queue.1474 @result: Check passed.1475 @duration: 5 sec.1476 """1477 try:1478 action_queue = 11479 num_packets = 1001480 src_ip = "192.168.100.1"1481 dst_ip = '192.168.200.1'1482 src_port = 65651483 dst_port = 10241484 self.add_filter(flow_type='sctp4', src_ip=src_ip)1485 stat_before = self.dut_statistics.get_drv_counters()1486 # send sctp1487 self.send_packets(ipv4=True, sctp=True, count=num_packets,1488 src_ip=(src_ip, 4), dst_ip=dst_ip,1489 src_port=src_port, dst_port=dst_port)1490 # send tcp (background traffic)1491 self.send_packets(ipv4=True, tcp=True, count=num_packets,1492 src_ip=src_ip, dst_ip=dst_ip,1493 src_port=src_port, dst_port=dst_port)1494 stat_after = self.dut_statistics.get_drv_counters()1495 k = self.queue_name.format(action_queue)1496 assert stat_after[k] - stat_before[k] == num_packets1497 finally:1498 self.clear_filters()1499 def test_rxflow_sctp4_dst_ip(self):1500 """1501 @description: Test rxflow l3l4 sctp4 dst ip filter1502 @steps:1503 1. Configure filter.1504 2. Send traffic.1505 3. Check traffic recived in target queue.1506 @result: Check passed.1507 @duration: 5 sec.1508 """1509 try:1510 action_queue = 11511 num_packets = 1001512 src_ip = "192.168.100.1"1513 dst_ip = '192.168.200.1'1514 src_port = 65651515 dst_port = 10241516 self.add_filter(flow_type='sctp4', dst_ip=dst_ip)1517 stat_before = self.dut_statistics.get_drv_counters()1518 # send sctp1519 self.send_packets(ipv4=True, sctp=True, count=num_packets,1520 src_ip=src_ip, dst_ip=(dst_ip, 4),1521 src_port=src_port, dst_port=dst_port)1522 # send tcp (background traffic)1523 self.send_packets(ipv4=True, tcp=True, count=num_packets,1524 src_ip=src_ip, dst_ip=dst_ip,1525 src_port=src_port, dst_port=dst_port)1526 stat_after = self.dut_statistics.get_drv_counters()1527 k = self.queue_name.format(action_queue)1528 assert stat_after[k] - stat_before[k] == num_packets1529 finally:1530 self.clear_filters()1531 def test_rxflow_sctp4_all_multiple_ips(self):1532 """1533 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1534 @steps:1535 1. Configure filter.1536 2. Send traffic.1537 3. Check traffic recived in target queue.1538 @result: Check passed.1539 @duration: 5 sec.1540 """1541 try:1542 action_queue = 11543 num_packets = 1001544 src_ip = "192.168.100.1"1545 dst_ip = "192.168.200.1"1546 src_port = 102441547 dst_port = 102431548 if self.dut_fw_card == CARD_ANTIGUA:1549 filters_count = 31550 self.add_filter(flow_type='sctp4',1551 src_ip=(src_ip, 1), dst_ip=(dst_ip, 3),1552 src_port=src_port, dst_port=dst_port)1553 else:1554 filters_count = 41555 self.add_filter(flow_type='sctp4',1556 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1557 src_port=src_port, dst_port=dst_port)1558 stat_before = self.dut_statistics.get_drv_counters()1559 # send sctp1560 self.send_packets(ipv4=True, sctp=True, count=num_packets,1561 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1562 src_port=(src_port, 2), dst_port=(dst_port, 2))1563 # send tcp (background traffic)1564 self.send_packets(ipv4=True, tcp=True, count=num_packets,1565 src_ip=src_ip, dst_ip=dst_ip,1566 src_port=src_port, dst_port=dst_port)1567 stat_after = self.dut_statistics.get_drv_counters()1568 k = self.queue_name.format(action_queue)1569 assert stat_after[k] - stat_before[k] == num_packets * filters_count1570 finally:1571 self.clear_filters()1572 def test_rxflow_sctp4_all_multiple_ports(self):1573 """1574 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1575 @steps:1576 1. Configure filter.1577 2. Send traffic.1578 3. Check traffic recived in target queue.1579 @result: Check passed.1580 @duration: 5 sec.1581 """1582 try:1583 action_queue = 11584 num_packets = 1001585 src_ip = "192.168.100.1"1586 dst_ip = "192.168.200.1"1587 src_port = 102441588 dst_port = 102431589 if self.dut_fw_card == CARD_ANTIGUA:1590 filters_count = 31591 self.add_filter(flow_type='sctp4',1592 src_ip=src_ip, dst_ip=dst_ip,1593 src_port=src_port, dst_port=(dst_port, 3))1594 else:1595 filters_count = 41596 self.add_filter(flow_type='sctp4',1597 src_ip=src_ip, dst_ip=dst_ip,1598 src_port=(src_port, 2), dst_port=(dst_port, 2))1599 stat_before = self.dut_statistics.get_drv_counters()1600 # send sctp1601 self.send_packets(ipv4=True, sctp=True, count=num_packets,1602 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1603 src_port=(src_port, 4), dst_port=(dst_port, 4))1604 # send tcp (background traffic)1605 self.send_packets(ipv4=True, tcp=True, count=num_packets,1606 src_ip=src_ip, dst_ip=dst_ip,1607 src_port=src_port, dst_port=dst_port)1608 stat_after = self.dut_statistics.get_drv_counters()1609 k = self.queue_name.format(action_queue)1610 assert stat_after[k] - stat_before[k] == num_packets * filters_count1611 finally:1612 self.clear_filters()1613 def test_rxflow_sctp4_srcport_drop(self):1614 """1615 @description: Test rxflow l3l4 srcport DROP filter1616 @steps:1617 1. Configure filter.1618 2. Send traffic.1619 3. Check traffic dropped.1620 @result: Check passed.1621 @duration: 5 sec.1622 """1623 try:1624 action_queue = -11625 num_packets = 1001626 src_ip = "192.168.100.1"1627 dst_ip = "192.168.200.1"1628 src_port = 102441629 dst_port = 102431630 self.add_filter(flow_type='sctp4', src_port=src_port, action=action_queue)1631 stat_before = self.dut_statistics.get_drv_counters()1632 self.send_packets(ipv4=True, sctp=True, count=num_packets,1633 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1634 src_port=src_port, dst_port=(dst_port, 2),1635 backgroud_traffic=False)1636 stat_after = self.dut_statistics.get_drv_counters()1637 for q in range(self.rings):1638 k = self.queue_name.format(q)1639 assert stat_after[k] - stat_before[k] < 201640 finally:1641 self.clear_filters()1642 def test_rxflow_sctp6_src_port(self):1643 """1644 @description: Test rxflow l3l4 sctp6 src port filter1645 @steps:1646 1. Configure filter.1647 2. Send traffic.1648 3. Check traffic recived in target queue.1649 @result: Check passed.1650 @duration: 5 sec.1651 """1652 try:1653 action_queue = 11654 num_packets = 1001655 src_ip = "2001:db8:0001::1"1656 dst_ip = "2001:db8:0002::1"1657 src_port = 65651658 dst_port = 10241659 self.add_filter(flow_type='sctp6', src_port=src_port)1660 stat_before = self.dut_statistics.get_drv_counters()1661 # send sctp61662 self.send_packets(ipv6=True, sctp=True, count=num_packets,1663 src_ip=src_ip, dst_ip=dst_ip,1664 src_port=(src_port, 4), dst_port=dst_port)1665 # send tcp6 (background traffic)1666 self.send_packets(ipv6=True, tcp=True, count=num_packets,1667 src_ip=src_ip, dst_ip=dst_ip,1668 src_port=src_port, dst_port=dst_port)1669 stat_after = self.dut_statistics.get_drv_counters()1670 k = self.queue_name.format(action_queue)1671 assert stat_after[k] - stat_before[k] == num_packets1672 finally:1673 self.clear_filters()1674 def test_rxflow_sctp6_dst_port(self):1675 """1676 @description: Test rxflow l3l4 sctp6 dst port filter1677 @steps:1678 1. Configure filter.1679 2. Send traffic.1680 3. Check traffic recived in target queue.1681 @result: Check passed.1682 @duration: 5 sec.1683 """1684 try:1685 action_queue = 11686 num_packets = 1001687 src_ip = "2001:db8:0001::1"1688 dst_ip = "2001:db8:0002::1"1689 src_port = 65651690 dst_port = 10241691 self.add_filter(flow_type='sctp6', dst_port=dst_port)1692 stat_before = self.dut_statistics.get_drv_counters()1693 # send sctp61694 self.send_packets(ipv6=True, sctp=True, count=num_packets,1695 src_ip=src_ip, dst_ip=dst_ip,1696 src_port=src_port, dst_port=(dst_port, 4))1697 # send tcp6 (background traffic)1698 self.send_packets(ipv6=True, tcp=True, count=num_packets,1699 src_ip=src_ip, dst_ip=dst_ip,1700 src_port=src_port, dst_port=dst_port)1701 stat_after = self.dut_statistics.get_drv_counters()1702 k = self.queue_name.format(action_queue)1703 assert stat_after[k] - stat_before[k] == num_packets1704 finally:1705 self.clear_filters()1706 def test_rxflow_sctp6_src_ip(self):1707 """1708 @description: Test rxflow l3l4 sctp6 src ip filter1709 @steps:1710 1. Configure filter.1711 2. Send traffic.1712 3. Check traffic recived in target queue.1713 @result: Check passed.1714 @duration: 5 sec.1715 """1716 try:1717 action_queue = 11718 num_packets = 1001719 src_ip = "2001:db8:0001::1"1720 dst_ip = "2001:db8:0002::1"1721 src_port = 65651722 dst_port = 10241723 self.add_filter(flow_type='sctp6', src_ip=src_ip)1724 stat_before = self.dut_statistics.get_drv_counters()1725 # send sctp61726 self.send_packets(ipv6=True, sctp=True, count=num_packets,1727 src_ip=(src_ip, 4), dst_ip=dst_ip,1728 src_port=src_port, dst_port=dst_port)1729 # send tcp6 (background traffic)1730 self.send_packets(ipv6=True, tcp=True, count=num_packets,1731 src_ip=src_ip, dst_ip=dst_ip,1732 src_port=src_port, dst_port=dst_port)1733 stat_after = self.dut_statistics.get_drv_counters()1734 k = self.queue_name.format(action_queue)1735 assert stat_after[k] - stat_before[k] == num_packets1736 finally:1737 self.clear_filters()1738 def test_rxflow_sctp6_dst_ip(self):1739 """1740 @description: Test rxflow l3l4 sctp6 dst ip filter1741 @steps:1742 1. Configure filter.1743 2. Send traffic.1744 3. Check traffic recived in target queue.1745 @result: Check passed.1746 @duration: 5 sec.1747 """1748 try:1749 action_queue = 11750 num_packets = 1001751 src_ip = "2001:db8:0001::1"1752 dst_ip = "2001:db8:0002::1"1753 src_port = 65651754 dst_port = 10241755 self.add_filter(flow_type='sctp6', dst_ip=dst_ip)1756 stat_before = self.dut_statistics.get_drv_counters()1757 # send sctp61758 self.send_packets(ipv6=True, sctp=True, count=num_packets,1759 src_ip=src_ip, dst_ip=(dst_ip, 4),1760 src_port=src_port, dst_port=dst_port)1761 # send tcp6 (background traffic)1762 self.send_packets(ipv6=True, tcp=True, count=num_packets,1763 src_ip=src_ip, dst_ip=dst_ip,1764 src_port=src_port, dst_port=dst_port)1765 stat_after = self.dut_statistics.get_drv_counters()1766 k = self.queue_name.format(action_queue)1767 assert stat_after[k] - stat_before[k] == num_packets1768 finally:1769 self.clear_filters()1770 def test_rxflow_sctp6_all_multiple_ips(self):1771 """1772 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1773 @steps:1774 1. Configure filter.1775 2. Send traffic.1776 3. Check traffic recived in target queue.1777 @result: Check passed.1778 @duration: 5 sec.1779 """1780 try:1781 action_queue = 11782 num_packets = 1001783 src_ip = "2001:db8:0001::1"1784 dst_ip = "2001:db8:0002::1"1785 src_ip_2 = "2001:db8:0001::2"1786 dst_ip_2 = "2001:db8:0002::2"1787 src_port = 102441788 dst_port = 102431789 if self.dut_fw_card == CARD_ANTIGUA:1790 filters_count = 31791 self.add_filter(flow_type='sctp6',1792 src_ip=src_ip, dst_ip=(dst_ip, 3),1793 src_port=src_port, dst_port=dst_port)1794 else:1795 filters_count = 21796 self.add_filter(flow_type='sctp6',1797 src_ip=src_ip, dst_ip=dst_ip,1798 src_port=src_port, dst_port=dst_port, loc=32)1799 self.add_filter(flow_type='sctp6',1800 src_ip=src_ip_2, dst_ip=dst_ip_2,1801 src_port=src_port, dst_port=dst_port, loc=36)1802 stat_before = self.dut_statistics.get_drv_counters()1803 # send sctp61804 self.send_packets(ipv6=True, sctp=True, count=num_packets,1805 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1806 src_port=(src_port, 2), dst_port=(dst_port, 2))1807 # send tcp6 (background traffic)1808 self.send_packets(ipv6=True, tcp=True, count=num_packets,1809 src_ip=src_ip, dst_ip=dst_ip,1810 src_port=src_port, dst_port=dst_port)1811 stat_after = self.dut_statistics.get_drv_counters()1812 k = self.queue_name.format(action_queue)1813 assert stat_after[k] - stat_before[k] == num_packets * filters_count1814 finally:1815 self.clear_filters()1816 def test_rxflow_sctp6_all_multiple_ports(self):1817 """1818 @description: Test rxflow l3l4 dstip srcip dstport srcport 8 filters1819 @steps:1820 1. Configure filter.1821 2. Send traffic.1822 3. Check traffic recived in target queue.1823 @result: Check passed.1824 @duration: 5 sec.1825 """1826 try:1827 action_queue = 11828 num_packets = 1001829 src_ip = "2001:db8:0001::1"1830 dst_ip = "2001:db8:0002::1"1831 src_port = 102441832 dst_port = 102431833 src_port_2 = 102461834 dst_port_2 = 102451835 if self.dut_fw_card == CARD_ANTIGUA:1836 filters_count = 31837 self.add_filter(flow_type='sctp6',1838 src_ip=src_ip, dst_ip=dst_ip,1839 src_port=src_port, dst_port=(dst_port, 3))1840 else:1841 filters_count = 21842 self.add_filter(flow_type='sctp6',1843 src_ip=src_ip, dst_ip=dst_ip,1844 src_port=src_port, dst_port=dst_port, loc=32)1845 self.add_filter(flow_type='sctp6',1846 src_ip=src_ip, dst_ip=dst_ip,1847 src_port=src_port_2, dst_port=dst_port_2, loc=36)1848 stat_before = self.dut_statistics.get_drv_counters()1849 # send sctp61850 self.send_packets(ipv6=True, sctp=True, count=num_packets,1851 src_ip=(src_ip, 2), dst_ip=(dst_ip, 2),1852 src_port=(src_port, 4), dst_port=(dst_port, 4))1853 # send tcp6 (background traffic)1854 self.send_packets(ipv6=True, tcp=True, count=num_packets,1855 src_ip=src_ip, dst_ip=dst_ip,1856 src_port=src_port, dst_port=dst_port)1857 stat_after = self.dut_statistics.get_drv_counters()1858 k = self.queue_name.format(action_queue)1859 assert stat_after[k] - stat_before[k] == num_packets * filters_count1860 finally:1861 self.clear_filters()1862 def test_rxflow_sctp6_srcport_drop(self):1863 """1864 @description: Test rxflow l3l4 srcport DROP filter1865 @steps:1866 1. Configure filter.1867 2. Send traffic.1868 3. Check traffic dropped.1869 @result: Check passed.1870 @duration: 5 sec.1871 """1872 try:1873 action_queue = -11874 num_packets = 1001875 src_ip = "2001:db8:0001::1"1876 dst_ip = "2001:db8:0002::1"1877 src_port = 102441878 dst_port = 102431879 self.add_filter(flow_type='sctp6', src_port=src_port, action=action_queue)1880 stat_before = self.dut_statistics.get_drv_counters()1881 self.send_packets(ipv6=True, sctp=True, count=num_packets,1882 src_ip=(src_ip, 4), dst_ip=(dst_ip, 4),1883 src_port=src_port, dst_port=(dst_port, 2),1884 backgroud_traffic=False)1885 stat_after = self.dut_statistics.get_drv_counters()1886 for q in range(self.rings):1887 k = self.queue_name.format(q)1888 assert stat_after[k] - stat_before[k] < 201889 finally:1890 self.clear_filters()1891if __name__ == "__main__":...

Full Screen

Full Screen

funcs.py

Source:funcs.py Github

copy

Full Screen

...13 if choosen_timeout is None:14 return sniff(count=choosen_count, lfilter=filter_packets)15 else:16 return sniff(count=choosen_count, lfilter=filter_packets, timeout=choosen_timeout)17def send_packets(data, index=None):18 request_packet = IP(dst=SERVER_IP)/ICMP(type="echo-request")19 if index is None: # If we are sending an ACK packet and not a file-data one20 request_packet = request_packet/data21 else:22 data = str(index).encode() + b'|' + data23 request_packet = request_packet/data24 send(request_packet, verbose=False)25def get_packet_index():26 packets = sniff_packets(1)27 packet = packets[0]28 return int(packet[Raw].load)29def get_ack(packets_window):30 packet_index = get_packet_index()31 while packet_index != WINDOW_SIZE:32 # Re-send missing packets to the server33 for missing_packet in range(packet_index, WINDOW_SIZE):34 send_packets(packets_window[missing_packet], missing_packet)35 packet_index = get_packet_index()36def get_final_ack(packets_window):37 """This method is used to verify that the last 8 or less packets were recieved"""38 packet_index = get_packet_index()39 while packet_index != len(packets_window):40 for missing_packet in range(packet_index, len(packets_window)):41 send_packets(packets_window[missing_packet], missing_packet)42 packet_index = get_packet_index()43def read_in_chunks(file_object):44 """Lazy function (generator) to read a file piece by piece."""45 while True:46 data = file_object.read(CHUNK_SIZE)47 if not data:48 break49 yield data50def send_file(file_path):51 with open(file_path, 'rb') as f:52 packets_window = []53 for piece in read_in_chunks(f):54 if len(packets_window) == WINDOW_SIZE:55 get_ack(packets_window)56 packets_window = []57 # This if statement is made to test the message-ACK algorithm. It is not actually needed.58 if random.randrange(3, 10) != 5:59 send_packets(piece, len(packets_window))60 packets_window.append(piece)61 get_final_ack(packets_window)62 print('All packets have been sent. Shutting down.')63def send_syn(file_name, packets_amount):64 """This function simulates a part of TCP's three-way-handshake by sending a SYN message with arguments and waiting for an ack packet in return."""65 send_packets(f'{file_name}|{packets_amount}')66 packets = sniff_packets(1, 10)67 if len(packets) == 1 and packets[0][Raw].load == b'ACK':68 print('Got ACK, starting to send the file.')69 return70 raise Exception('Server is off.')71def launch(file_path, file_name):72 packets_amount = math.ceil(os.path.getsize(file_path) / CHUNK_SIZE)73 send_syn(file_name, packets_amount)...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

1# Handle requests to and from facebook API and heroku server2import os3import json4import requests5from flask import Flask, request6from query_sort import clean_text, sort_query7app = Flask(__name__)8POST_PARAMS = {9 "access_token": os.environ["PAGE_ACCESS_TOKEN"]10}11POST_HEADERS = {12 "Content-Type": "application/json"13}14@app.route('/', methods=['GET'])15def verify():16 if request.args.get("hub.mode") == "subscribe" and request.args.get(17 "hub.challenge"):18 if not request.args.get(19 "hub.verify_token") == os.environ["VERIFY_TOKEN"]:20 return "Verification token mismatch", 40321 return request.args["hub.challenge"], 20022 return "Err: 404, you came to the wrong place <br> \23 Visit <a href=https://www.facebook.com/ChatbotBOB>BOB's \24 facebook page</a>", 20025@app.route('/', methods=['POST'])26def webhook():27 data = request.get_json()28 if data["object"] == "page":29 for entry in data["entry"]:30 for msg_event in entry["messaging"]:31 sender_id = msg_event["sender"]["id"]32 if isinstance(msg_event, list):33 msg_event = msg_event[0]34 if "postback" in msg_event:35 msg_text = msg_event["postback"]["payload"]36 elif "text" in msg_event["message"]:37 msg_text = msg_event["message"]["text"]38 else:39 return "ok", 20040 msg_text = clean_text(msg_text)41 send_packets = sort_query(msg_text, sender_id)42 send_message(send_packets)43 return "ok", 20044def send_message(send_packets):45 if send_packets:46 for packet in send_packets:47 json_packet = json.dumps(packet)48 requests.post(49 "https://graph.facebook.com/v2.6/me/messages",50 params=POST_PARAMS,51 headers=POST_HEADERS,52 data=json_packet)53if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful