Best Python code snippet using autotest_python
net_utils.py
Source:net_utils.py  
...224            return 'enabled' in output225        return False226    def enable_promisc(self):227        bin_utils.system('ifconfig %s promisc' % self._name)228    def disable_promisc(self):229        bin_utils.system('ifconfig %s -promisc' % self._name)230    def get_hwaddr(self):231        f = open('/sys/class/net/%s/address' % self._name)232        hwaddr = f.read().strip()233        f.close()234        return hwaddr235    def set_hwaddr(self, hwaddr):236        bin_utils.system('ifconfig %s hw ether %s' % (self._name, hwaddr))237    def add_maddr(self, maddr):238        bin_utils.system('ip maddr add %s dev %s' % (maddr, self._name))239    def del_maddr(self, maddr):240        bin_utils.system('ip maddr del %s dev %s' % (maddr, self._name))241    def get_ipaddr(self):242        ipaddr = "0.0.0.0"...testing.py
Source:testing.py  
...53	return54def enable_promisc(interface):55	#enable promiscuous listening mode on every interface:56	os.system(f'ifconfig {interface} promisc')57def disable_promisc(interface):58	#disable promiscuous listening mode on every interface:59	os.system(f'ifconfig {interface} -promisc')60def format_packet_data(packet):61	packet_data = {'timestamp' : packet.sniff_timestamp,}62	if 'ETH' in str(packet.layers):63		packet_data['eth'] = {	'src' : packet.eth.src,64								'dst' : packet.eth.dst,}65	if 'TCP' in str(packet.layers) or 'UDP' in str(packet.layers):66		packet_data['ip']  = {	'src' : packet.ip.src,67								'dst' : packet.ip.dst,}68	if 'TCP' in str(packet.layers):69		packet_data['tcp'] = {	'src' : packet.tcp.srcport,70								'dst' : packet.tcp.dstport,}71	if 'UDP' in str(packet.layers):72		packet_data['udp'] = {	'src' : packet.udp.srcport,73								'dst' : packet.udp.dstport,}74	if 'ARP' in str(packet.layers):75		packet_data['arp'] = {	'mac' : {	'src' : packet.arp.src_hw_mac,76											'dst' : packet.arp.dst_hw_mac,77										},78								'ip'  : {	'src' : packet.arp.src_proto_ipv4,79											'dst' : packet.arp.dst_proto_ipv4,80										},81								'code': packet.arp.opcode,}82	# packet.pretty_print()83	return json.dumps(packet_data)84def live_capture(interface=None):85	#start a live packet capture on the specified interface86	print(f'waiting for packets on {interface}')87	enable_promisc(interface)88	capture = pyshark.LiveCapture(interface=interface, only_summaries=False)89	for packet in capture.sniff_continuously():90		with open(f'./pcap_{interface}.log', 'a') as log:91			log.write(format_packet_data(packet) + '\n') 92	disable_promisc(interface)93	return94def create_captures():95	#create a seperate capture for each interface96	captures =[]97	find_interfaces()98	for interface in interfaces:99		p = multiprocessing.Process(target=live_capture, args=(interface,))100		captures.append(p)101		p.start()102	return captures103def add_new_data_from(logfile):104	new_packets = []105	with open(logfile, 'r') as log:106		for packet in log:...test-net-dev.py
Source:test-net-dev.py  
1#!/usr/bin/env python32import os3import shutil4import sys5import time6from pwn import *7DIRECTORY = os.environ.get('VMM_DIRECTORY', '/app')8DEBUG_CONSOLE_PATH = os.environ.get('VMM_DEBUG_CONSOLE_PATH', '/app/ooowsserial-debug.py')9DISK_IMAGE_PATH = os.environ.get('VMM_DISK_IMAGE_PATH', '/app/disk')10OS_BOOTED = b"OOO OS BOOTED"11PREPARE_NET = b"prepare_net"12SET_DRIVER_FEATURES = b"net_set_driver_features"13ENABLE_CHKSUM_TX = b"net_enable_chksum_tx"14DISABLE_CHKSUM_TX = b"net_disable_chksum_tx"15ENABLE_ETH_CRC_TX = b"net_enable_tx_eth"16DISABLE_ETH_CRC_TX = b"net_disable_tx_eth"17ENABLE_PROMISC = b"net_enable_promisc"18DISABLE_PROMISC = b"net_disable_promisc"19TX_NORMAL_PACKET = b"tx_pkt"20TX_IPV4_PACKET = b"tx_ipv4_pkt"21RX_PACKET = b"rx_pkt"22LISTEN_ONE_MSG_CMD = ["mosquitto_sub", "-C", "1", "-N", "-t", "VPC"]23SEND_ONE_MSG_CMD = ["mosquitto_pub", "-s", "-t", "VPC"]24NORMAL_PACKET = b"DSTMACSRCMACLN\x61TLIDFOTP11IPIPDTDTDTDTAAA"25NORMAL_PACKET_CRC = b"\x4d\x1a\x85\xa2"26IPV4_NORMAL_PACKET = b"DSTMACSRCMACLN\x45GTLIDFOTP__IPIPDTDTDTDTAA"27IPV4_NORMAL_PACKET_CRC = b"\x7b\xa2\x62\xfa"28IPV4_CHKSUM_PACKET = b"DSTMACSRCMACLN\x45GTLIDFOTP<tIPIPDTDTDTDTAA"29BROADCAST_PACKET =b"TAAG\xff\xff\xff\xff\xff\xffSRCMACBodyOfPacketYo"30NON_BROADCAST_PACKET =b"TAAG\x11\x11\x11\x11\x11\xffSRCMACBodyOfPacketYo"31# context.log_level = 'debug'32def check_pkt_tx(p, cmd, correct):33    # Try to receive a packet34    listen = process(LISTEN_ONE_MSG_CMD)35    p.sendline(cmd)36    received_packet = listen.readall()37    assert(len(received_packet) > 4)38    pkt_data = received_packet[4:]39    if len(pkt_data) != len(correct):40        print("PUBLIC: packet tx fail: incorrect size")41        sys.exit(-1)42    if pkt_data != correct:43        print("PUBLIC: packet tx fail: incorrect data received")44        sys.exit(-1)45    listen.kill()46def check_pkt_rx(p, cmd, pkt, expected):47    p.sendline(cmd)48    p.recvuntil(cmd)49    # give it time to call into the driver50    time.sleep(10)51    send = process(SEND_ONE_MSG_CMD)52    send.send(pkt)53    send.shutdown()54    hex_size_regex = b"([0-9a-bA-B]+)\n"55    result = p.recvregex(hex_size_regex)56    match = re.search(hex_size_regex, result)57    size_hex = match.group(1)58    size = int(size_hex, 16)59    if size != len(expected):60        print("PUBLIC: packet rx fail: incorrect size")61        sys.exit(-1)62    raw_pkt = p.recvn(size)63    if raw_pkt != expected:64        print("PUBLIC: packet rx fail: incorrect data received")65        sys.exit(-1)66    send.kill()67def main():68    os.chdir(DIRECTORY)69    # Check if we need to run mosquitto server70    mosquitto = None71    if not 'DO_NOT_START_MOSQUITTO' in os.environ:72        mosquitto = process(["/usr/sbin/mosquitto", "-c", "/etc/mosquitto/mosquitto.conf"])73    shutil.copyfile(DEBUG_CONSOLE_PATH, "./devices-bin/ooowsserial.py")74    p = process(["./vmm", "test", DISK_IMAGE_PATH, "1", "devices.config"])75    p.recvuntil(OS_BOOTED)76    # important, need to initialize the structures77    p.recvuntil(b'$')78    p.sendline(PREPARE_NET)79    p.recvuntil(b'$')80    p.sendline(SET_DRIVER_FEATURES)81    p.recvuntil(b'$')82    check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET)83    p.recvuntil(b'$')84    check_pkt_tx(p, TX_IPV4_PACKET, IPV4_NORMAL_PACKET)85    p.recvuntil(b'$')86    p.sendline(ENABLE_CHKSUM_TX)87    p.recvuntil(b'$')88    check_pkt_tx(p, TX_IPV4_PACKET, IPV4_CHKSUM_PACKET)89    # Verify that non IPv4 packets are not affected90    p.recvuntil(b'$')91    check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET)92    p.recvuntil(b'$')93    p.sendline(DISABLE_CHKSUM_TX)94    p.recvuntil(b'$')95    p.sendline(ENABLE_ETH_CRC_TX)96    p.recvuntil(b'$')97    check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET + NORMAL_PACKET_CRC)98    p.recvuntil(b'$')99    check_pkt_tx(p, TX_IPV4_PACKET, IPV4_NORMAL_PACKET + IPV4_NORMAL_PACKET_CRC)100    p.recvuntil(b'$')101    check_pkt_rx(p, RX_PACKET, BROADCAST_PACKET, BROADCAST_PACKET[4:])102    p.recvuntil(b'$')103    p.sendline(ENABLE_PROMISC)104    p.recvuntil(b'$')105    check_pkt_rx(p, RX_PACKET, NON_BROADCAST_PACKET, NON_BROADCAST_PACKET[4:])106    p.kill()107    if mosquitto:108        mosquitto.kill()109if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
