Best Python code snippet using autotest_python
firewall.py
Source:firewall.py  
...41		if event.connection.dpid not in self.nodes:42			self.nodes.append(event.connection.dpid)4344	"Install a flow rule to drop packets"45	def block_host(self, ip=None, mac=None, port=None):46		msg = of.ofp_flow_mod()47		msg.priority = 65535 #higher priority48		if ip: #if it was received an IP address then we will block by IP address49			msg.match.dl_type = ethernet.IP_TYPE50			msg.match.nw_src = IPAddr(ip)51		elif mac: #otherwise, the block is by MAC address52			msg.match.dl_src = EthAddr(mac)53		if port:54			msg.match.tp_dst = port55		msg.actions.append(of.ofp_action_output(port=of.OFPP_CONTROLLER))56		for node in self.nodes:57			try:58				core.openflow.getConnection(node).send(msg)59			except:60				continue6162	'''63	This method handle packets sent by switches when they do not have a rule specifying how to handle the packet.64	In this case we are going to use this method to block when the first packets are sent.65	'''66	def _handle_PacketIn(self, event):67		"Parse the packet"68		packet = event.parsed69		"Check its type"70		ipv4_pkt = packet.find('ipv4')71		arp_pkt = packet.find('arp')72		if ipv4_pkt:73			src_ip = ipv4_pkt.srcip74		elif arp_pkt:75			src_ip = arp_pkt.protosrc76			src_mac = arp_pkt.hwsrc7778		for address in self.black_list:79			if "." in address:80				self.block_host(ip=address)81			elif ":" in address:82				self.block_host(mac=address)8384	'''85	This method handle flow statistics periodically requested to the devices. The period is defined in the timer declared at the '__init__' (in our case 1 second).86	We will use the method to obtain flow's statiscs and block users based on a few rules created as example.87	'''88	def _handle_FlowStatsReceived(self,event):8990		"Get the node providing flow statiscs"91		dpid = event.connection.dpid9293		def bytes_to_mbps(bytes):94			return bytes/1024.0/1024.0*89596		for stat in event.stats:9798			"See the match structure in: https://www.opennetworking.org/images/stories/downloads/sdn-resources/onf-specifications/openflow/openflow-spec-v1.0.0.pdf"99			match = stat.match100			src_ip = match.nw_src101102			"Skip LLDP (Link Layer Discovery) packets. The 'continue' skip this loop interaction"103			if match.dl_type == ethernet.LLDP_TYPE:104				continue105			"Initiate 'prev_stats_flows' in its first usage"106			if not self.prev_stats_flows[match][dpid]:107				self.prev_stats_flows[match][dpid] = 0108			"Get the previous byte_count"109			prev_byte_count = self.prev_stats_flows[match][dpid]110			"Store the current counter for bytes. This is done in a per-flow basis which means that we are obtaining the bandwidth usage of each flow-rule installed in the node"111			self.prev_stats_flows[match][dpid] = stat.byte_count112			"Calculate the mbps"113			mbps = bytes_to_mbps(stat.byte_count - prev_byte_count)114115			"Firewall rules"116117			"1 - if the protocol is ICMP and its bandwidth exceeds a certain bandwidth threshold"118			if match.nw_proto == ipv4.ICMP_PROTOCOL and mbps > self.icmp_threshold:119				self.block_host(ip=src_ip)120121			"2 - if the protocol is TCP and the destination port is in the blocked list"122			if match.nw_proto == ipv4.TCP_PROTOCOL and match.tp_dst in self.blocked_tcp_ports:123				self.block_host(ip=src_ip)124125			"3 - if the protocol is UDP and the destination port is in the blocked list"126			if match.nw_proto == ipv4.UDP_PROTOCOL and match.tp_dst in self.blocked_udp_ports:127				self.block_host(ip=src_ip)128129130"Method executed when the controller is started"131def launch():132	"Try to clean anything using 6633/tcp before starting the controller"133	try:134		import os135		os.system("sudo fuser -k 6633/tcp")136	except:137		pass138139	"Log and colors"140	import pox.log.color141	pox.log.color.launch()
...status.py
Source:status.py  
1#!/usr/bin/env python32# @Time    : 2020-02-143# @Author  : caicai4# @File    : status.py5import time6from myscan.lib.core.common import getredis7from myscan.lib.core.data import logger, cmd_line_options, paths, others, conn, count8from myscan.lib.core.common_reverse import query_reverse9from myscan.config import scan_set10import threading11import traceback12def count_status():13    red = getredis()14    while True:15        try:16            time.sleep(int(scan_set.get("status_flush_time", 30)))17            burpdata_undo = red.llen("burpdata")18            if scan_set.get("random_test", False):19                # workdata = red.spop("work_data_py_set")20                unactive = red.scard("work_data_py_set")21            else:22                # red.lpush("work_data_py", pickledata)23                # workdata = red.lpop("work_data_py")24                unactive = red.llen("work_data_py")25            vuln = red.llen("vuln_all")26            data = red.hmget("count_all", "doned", "request", "block_host", "request_fail", "active")27            burpdata_doned, request, block_host, request_fail, active = list(28                map(lambda x: x.decode(), data))29            reverse_count = 030            res, resdata = query_reverse("myscan_total")31            if res:32                reverse_count = int(resdata.get("total"))33            if cmd_line_options.command == "hostscan":34                logger.warning(35                    "do/undo/active/unactive:{}/{}/{}/{}  vuln:{}/reverse:{}".format(36                        burpdata_doned, burpdata_undo, active, unactive,37                        vuln, reverse_count), text="STATUS")38            elif cmd_line_options.command == "webscan":39                if cmd_line_options.allow_plugin:40                    undoplugin = red.llen("plugin_data_py")41                    logger.warning(42                        "do/undo/active/unactive/undoplugin:{}/{}/{}/{}/{} req_total/fail:{}/{} blockhost:{} vuln:{}/reverse:{}".format(43                            burpdata_doned, burpdata_undo, active, unactive, undoplugin, request,44                            request_fail,45                            block_host, vuln, reverse_count), text="STATUS")46                else:47                    logger.warning(48                        "do/undo/active/unactive:{}/{}/{}/{} req_total/fail:{}/{} blockhost:{} vuln:{}/reverse:{}".format(49                            burpdata_doned, burpdata_undo, active, unactive, request,50                            request_fail,51                            block_host, vuln, reverse_count), text="STATUS")52        except KeyboardInterrupt as ex:53            logger.warning("Ctrl+C was pressed ,aborted program")54        except Exception as ex:55            logger.warning("Count stat moudle get error:{}".format(ex))56            traceback.print_exc()57            pass58def start_count_status():59    if cmd_line_options.verbose < 3:60        t = threading.Thread(target=count_status)61        t.daemon = True...Recv_React.py
Source:Recv_React.py  
...10ANS_PORT = 300611BUFFER_SIZE = 25612def find_host_from_pktid(pkt_id):13  return H014def block_host(host):15  global FLOW_BLOCKED16  if not FLOW_BLOCKED:17    print("=============== Flows before blocking: ===============")18    subprocess.call('ovs-ofctl dump-flows s1', shell=True)19    cmd = "ovs-ofctl add-flow s1 priority=11,dl_type=0x0800,nw_src=" + str(host) + ",action=drop"20    subprocess.call(cmd, shell=True)21    print("=============== Flows after blocking: ===============")22    subprocess.call('ovs-ofctl dump-flows s1', shell=True)23    FLOW_BLOCKED = True24  else:25    print str(host) + " currently blocked"26def restore_host(host):27  global FLOW_BLOCKED28  if FLOW_BLOCKED:29    print("=============== Flows before restoring: ===============")30    subprocess.call('ovs-ofctl dump-flows s1', shell=True)31    cmd = 'ovs-ofctl --strict del-flows s1 priority=11,dl_type=0x0800,nw_src=' + str(host)32    subprocess.call(cmd, shell=True)33    print("=============== Flows after restoring: ===============")34    subprocess.call('ovs-ofctl dump-flows s1', shell=True)35    36    FLOW_BLOCKED = False37soc_recv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)38# to handle Python [Errno 98] Address already in use39soc_recv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)40soc_recv.bind((TCP_IP, ANS_PORT))41soc_recv.listen(5)42conn, addr = soc_recv.accept()43print 'Connection address:', addr44while 1:45  data = conn.recv(BUFFER_SIZE)46  print("======================================================")47  result = simplejson.loads(data)48  print result49  """ result looks something like this:50  {"pkt_id": "23.4567", "allow": True}51  """52  if not result["allow"]:53    host_to_block = find_host_from_pktid(result["pkt_id"])54    block_host(host_to_block)55    56    # try restoring again after BLOCK_TIME seconds57    threading.Timer(BLOCK_TIME, restore_host, [H0]).start()58  else:59    print "normal traffic"60soc_recv.close()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
