How to use block_host method in autotest

Best Python code snippet using autotest_python

firewall.py

Source:firewall.py Github

copy

Full Screen

...41 if event.connection.dpid not in self.nodes:42 self.nodes.append(event.connection.dpid)4344 "Install a flow rule to drop packets"45 def block_host(self, ip=None, mac=None, port=None):46 msg = of.ofp_flow_mod()47 msg.priority = 65535 #higher priority48 if ip: #if it was received an IP address then we will block by IP address49 msg.match.dl_type = ethernet.IP_TYPE50 msg.match.nw_src = IPAddr(ip)51 elif mac: #otherwise, the block is by MAC address52 msg.match.dl_src = EthAddr(mac)53 if port:54 msg.match.tp_dst = port55 msg.actions.append(of.ofp_action_output(port=of.OFPP_CONTROLLER))56 for node in self.nodes:57 try:58 core.openflow.getConnection(node).send(msg)59 except:60 continue6162 '''63 This method handle packets sent by switches when they do not have a rule specifying how to handle the packet.64 In this case we are going to use this method to block when the first packets are sent.65 '''66 def _handle_PacketIn(self, event):67 "Parse the packet"68 packet = event.parsed69 "Check its type"70 ipv4_pkt = packet.find('ipv4')71 arp_pkt = packet.find('arp')72 if ipv4_pkt:73 src_ip = ipv4_pkt.srcip74 elif arp_pkt:75 src_ip = arp_pkt.protosrc76 src_mac = arp_pkt.hwsrc7778 for address in self.black_list:79 if "." in address:80 self.block_host(ip=address)81 elif ":" in address:82 self.block_host(mac=address)8384 '''85 This method handle flow statistics periodically requested to the devices. The period is defined in the timer declared at the '__init__' (in our case 1 second).86 We will use the method to obtain flow's statiscs and block users based on a few rules created as example.87 '''88 def _handle_FlowStatsReceived(self,event):8990 "Get the node providing flow statiscs"91 dpid = event.connection.dpid9293 def bytes_to_mbps(bytes):94 return bytes/1024.0/1024.0*89596 for stat in event.stats:9798 "See the match structure in: https://www.opennetworking.org/images/stories/downloads/sdn-resources/onf-specifications/openflow/openflow-spec-v1.0.0.pdf"99 match = stat.match100 src_ip = match.nw_src101102 "Skip LLDP (Link Layer Discovery) packets. The 'continue' skip this loop interaction"103 if match.dl_type == ethernet.LLDP_TYPE:104 continue105 "Initiate 'prev_stats_flows' in its first usage"106 if not self.prev_stats_flows[match][dpid]:107 self.prev_stats_flows[match][dpid] = 0108 "Get the previous byte_count"109 prev_byte_count = self.prev_stats_flows[match][dpid]110 "Store the current counter for bytes. This is done in a per-flow basis which means that we are obtaining the bandwidth usage of each flow-rule installed in the node"111 self.prev_stats_flows[match][dpid] = stat.byte_count112 "Calculate the mbps"113 mbps = bytes_to_mbps(stat.byte_count - prev_byte_count)114115 "Firewall rules"116117 "1 - if the protocol is ICMP and its bandwidth exceeds a certain bandwidth threshold"118 if match.nw_proto == ipv4.ICMP_PROTOCOL and mbps > self.icmp_threshold:119 self.block_host(ip=src_ip)120121 "2 - if the protocol is TCP and the destination port is in the blocked list"122 if match.nw_proto == ipv4.TCP_PROTOCOL and match.tp_dst in self.blocked_tcp_ports:123 self.block_host(ip=src_ip)124125 "3 - if the protocol is UDP and the destination port is in the blocked list"126 if match.nw_proto == ipv4.UDP_PROTOCOL and match.tp_dst in self.blocked_udp_ports:127 self.block_host(ip=src_ip)128129130"Method executed when the controller is started"131def launch():132 "Try to clean anything using 6633/tcp before starting the controller"133 try:134 import os135 os.system("sudo fuser -k 6633/tcp")136 except:137 pass138139 "Log and colors"140 import pox.log.color141 pox.log.color.launch() ...

Full Screen

Full Screen

status.py

Source:status.py Github

copy

Full Screen

1#!/usr/bin/env python32# @Time : 2020-02-143# @Author : caicai4# @File : status.py5import time6from myscan.lib.core.common import getredis7from myscan.lib.core.data import logger, cmd_line_options, paths, others, conn, count8from myscan.lib.core.common_reverse import query_reverse9from myscan.config import scan_set10import threading11import traceback12def count_status():13 red = getredis()14 while True:15 try:16 time.sleep(int(scan_set.get("status_flush_time", 30)))17 burpdata_undo = red.llen("burpdata")18 if scan_set.get("random_test", False):19 # workdata = red.spop("work_data_py_set")20 unactive = red.scard("work_data_py_set")21 else:22 # red.lpush("work_data_py", pickledata)23 # workdata = red.lpop("work_data_py")24 unactive = red.llen("work_data_py")25 vuln = red.llen("vuln_all")26 data = red.hmget("count_all", "doned", "request", "block_host", "request_fail", "active")27 burpdata_doned, request, block_host, request_fail, active = list(28 map(lambda x: x.decode(), data))29 reverse_count = 030 res, resdata = query_reverse("myscan_total")31 if res:32 reverse_count = int(resdata.get("total"))33 if cmd_line_options.command == "hostscan":34 logger.warning(35 "do/undo/active/unactive:{}/{}/{}/{} vuln:{}/reverse:{}".format(36 burpdata_doned, burpdata_undo, active, unactive,37 vuln, reverse_count), text="STATUS")38 elif cmd_line_options.command == "webscan":39 if cmd_line_options.allow_plugin:40 undoplugin = red.llen("plugin_data_py")41 logger.warning(42 "do/undo/active/unactive/undoplugin:{}/{}/{}/{}/{} req_total/fail:{}/{} blockhost:{} vuln:{}/reverse:{}".format(43 burpdata_doned, burpdata_undo, active, unactive, undoplugin, request,44 request_fail,45 block_host, vuln, reverse_count), text="STATUS")46 else:47 logger.warning(48 "do/undo/active/unactive:{}/{}/{}/{} req_total/fail:{}/{} blockhost:{} vuln:{}/reverse:{}".format(49 burpdata_doned, burpdata_undo, active, unactive, request,50 request_fail,51 block_host, vuln, reverse_count), text="STATUS")52 except KeyboardInterrupt as ex:53 logger.warning("Ctrl+C was pressed ,aborted program")54 except Exception as ex:55 logger.warning("Count stat moudle get error:{}".format(ex))56 traceback.print_exc()57 pass58def start_count_status():59 if cmd_line_options.verbose < 3:60 t = threading.Thread(target=count_status)61 t.daemon = True...

Full Screen

Full Screen

Recv_React.py

Source:Recv_React.py Github

copy

Full Screen

...10ANS_PORT = 300611BUFFER_SIZE = 25612def find_host_from_pktid(pkt_id):13 return H014def block_host(host):15 global FLOW_BLOCKED16 if not FLOW_BLOCKED:17 print("=============== Flows before blocking: ===============")18 subprocess.call('ovs-ofctl dump-flows s1', shell=True)19 cmd = "ovs-ofctl add-flow s1 priority=11,dl_type=0x0800,nw_src=" + str(host) + ",action=drop"20 subprocess.call(cmd, shell=True)21 print("=============== Flows after blocking: ===============")22 subprocess.call('ovs-ofctl dump-flows s1', shell=True)23 FLOW_BLOCKED = True24 else:25 print str(host) + " currently blocked"26def restore_host(host):27 global FLOW_BLOCKED28 if FLOW_BLOCKED:29 print("=============== Flows before restoring: ===============")30 subprocess.call('ovs-ofctl dump-flows s1', shell=True)31 cmd = 'ovs-ofctl --strict del-flows s1 priority=11,dl_type=0x0800,nw_src=' + str(host)32 subprocess.call(cmd, shell=True)33 print("=============== Flows after restoring: ===============")34 subprocess.call('ovs-ofctl dump-flows s1', shell=True)35 36 FLOW_BLOCKED = False37soc_recv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)38# to handle Python [Errno 98] Address already in use39soc_recv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)40soc_recv.bind((TCP_IP, ANS_PORT))41soc_recv.listen(5)42conn, addr = soc_recv.accept()43print 'Connection address:', addr44while 1:45 data = conn.recv(BUFFER_SIZE)46 print("======================================================")47 result = simplejson.loads(data)48 print result49 """ result looks something like this:50 {"pkt_id": "23.4567", "allow": True}51 """52 if not result["allow"]:53 host_to_block = find_host_from_pktid(result["pkt_id"])54 block_host(host_to_block)55 56 # try restoring again after BLOCK_TIME seconds57 threading.Timer(BLOCK_TIME, restore_host, [H0]).start()58 else:59 print "normal traffic"60soc_recv.close()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful