Best Python code snippet using ATX
tap_wlan_interface.py
Source:tap_wlan_interface.py  
1import os2import sys3import signal4from optparse import OptionParser5from threading import Thread, current_thread6import netifaces as ni7import socket8# import ipaddress9import time10import subprocess11'''12multicast cases:13	1. packet from tap interface14	2. config number = 1,2,515unicast cases:16	1. packet from tap interface17	2. config number = 0,3,4,6,718'''19multicast_config = ['1','2','5', '17', '18']20unicast_config = ['0','3','4','6','7']21# triggers = []22device_map = {} # ipv6 -> wlan(ipv4)23riot_ip = ''24tap_ip = ''25wlan_ip = ''26blacklist = []27parser = OptionParser()28def pprint(arg):29    name = current_thread().getName()30    if name == 'Thread-1':31        print('\033[94m'+str(arg)+'\033[00m')32    elif name == 'Thread-2':33        print('\033[92m'+str(arg)+'\033[00m')34    else:35        print(str(arg))36def ip_validate(addr, ipv4):37    if ipv4:38        addr = addr.split('.')39        if len(addr) == 4:40            for value in addr:41                if not value.isdigit() or int(value) < 0 or int(value) > 255:42                    return False43        else:44            return False45    else:46        if addr == '::1' or addr == '::':47            return True48        addr = addr.split('%')[0].split(':')49        if len(addr) <= 8 and len(addr) >=3:50            for value in addr:51                try:52                    v = int(value, 16)53                    if v < 0 or v > 65535:54                        return False55                except ValueError:56                    if value == '':57                        continue58                    return False59        else:60            return False61    return True62def multicast(data, port=8809, ipv4=True):63    pprint("multicast():")64    pprint("data -> "+data)65    pprint("port -> "+str(port))66    pprint("is_ipv4 -> "+str(ipv4))67    global device_map68    global wlan_ip69    flag = False70    if ipv4:71        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)72        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)73        sock.bind((wlan_ip, 0))74    else:75        # this part will never be used in our case76        pprint("[-]Anomally detected in multicast function!!!")77        sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)78    sock.sendto(data, ('<broadcast>', port))79    sock.close()80    pprint("[+]sent")81    return True82def unicast(data, address, ipv4):83    pprint("unicast():")84    pprint("data -> "+data)85    pprint("address -> "+str(address))86    pprint("is_ipv4 ->"+str(ipv4))87    global parser88    options, args = parser.parse_args()89    command = 'echo'90    ip = address[0]91    if ipv4:92        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)93        if not ip_validate(ip, True):94            pprint("[-]Invalid ip")95            return False96    else:97        sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)98        if not ip_validate(ip, False):99            pprint("[-]Invalid ip")100            return False101    sock.sendto(data, address)102    pprint("[+]sent")103    sock.close()104    return True105def createTap(iface):106    pprint("createTap():")107    pprint("iface -> "+iface)108    os.system('sudo ip tuntap add '+iface+' mode tap user ${USER}')109    os.system('sudo ip link set '+iface+' up')110def startUDPserver(port, ipv4):111    pprint("\033[00m")112    pprint("startUDPserver():")113    pprint("port -> "+str(port))114    pprint("is_ipv4 -> "+str(ipv4))115    if ipv4:116        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)117        server_address = ('0.0.0.0', int(port))118    else:119        sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)120        server_address = ("::", int(port))121    sock.bind(server_address)122    return sock123def listen(sock, lis):124    pprint("listen():")125    pprint("sock -> "+str(sock))126    pprint("lis -> "+lis)127    global multicast128    global unicast129    # global triggers130    global device_map131    global riot_ip132    global tap_ip133    global wlan_ip134    while(True):135        data, address = sock.recvfrom(4096)136        pprint("while():")137        pprint("[+]Received {} from {}".format(data, address))138        if data and len(data.split(' '))>=3:139            values = data.split(' ')140            config = values[0]141            ip = values[-2]142            if config in multicast_config and lis=='local' and ip == "::":143                pprint("[+]multicast local")144                riot_ip = address145                pprint("[+]riot_ip-> "+str(riot_ip))146                multicast(data)147            elif config in unicast_config and lis=='local':148                pprint("[+]unicast local")149                try:150                    unicast(data, (device_map[ip], 8809), True)151                except Exception, e:152                    pprint(e)153            elif lis=='remote':154                # check if it is a broadcast packet from my own ip155                if config in multicast_config and address[0] == wlan_ip:156                    pprint("[+]Packet discarded")157                    continue158                # end159                pprint("[+]remote")160                remote_ip = values[-1]161                pprint("[+]Source of Data: "+str(remote_ip))162                if ip_validate(remote_ip, True) or ip_validate(remote_ip, False):163                    '''164                    if remote_ip not in triggers:165                        triggers.append(address[0])166                    '''167                    if remote_ip not in device_map.keys():168                        device_map[remote_ip] = address[0]169                else:170                    pprint("[-]Invalid ip")171                    continue172                pprint("[+]riot_ip-> "+str(riot_ip))173                unicast(data, riot_ip, False)174            else:175                pass176            pprint("[+]Device map:")177            pprint(device_map)178            # pprint("[+]Triggers:")179            # pprint(triggers)180        else:181            pprint("[-]Invalid data format!!!")182if __name__ == "__main__":183    global parser184    parser.add_option("-t", "--tap", dest="tap", help="Specify the name of the tap interface to create")185    parser.add_option("-w", "--wireless", dest="wlan", help="Specify the name of the wireless interface")186    parser.add_option("-p", "--path", dest="path", help="Specify the path of riot application")187    parser.add_option("-4", action="store_true", dest="ipv4", help="Use for ipv4 addressing", default=False) 188    (options, args) = parser.parse_args()189    pprint('[+]Creating tap interface '+options.tap)190    createTap(options.tap)191    pprint('[+]Assigning ip to tap interface')192    if os.getenv('RIOT_APP'):193        command = 'bash -c \"cd '+os.getenv('RIOT_APP')+' && sudo make term\"'194    else:195        command = 'bash -c \"cd '+options.path+' && sudo make term\"'196    p = subprocess.call("timeout 5 xterm -e "+command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)197    pprint('\n\nConnect to wifi and press Enter!!!\n\n')198    raw_input()199    global tap_ip200    global riot_ip201    global wlan_ip202    interfaces = ni.interfaces()203    if options.tap in interfaces and options.wlan in interfaces:204        try:205            tap_ip = ni.ifaddresses(options.tap)[10][0]['addr'].split('%')[0]206            wlan_ip = ni.ifaddresses(options.wlan)[2][0]['addr']207            # riot_ip = str(ipaddress.IPv6Address(unicode(tap_ip))+1)208        except Exception, e:209            pprint(e)210            pprint("[+]IP addresses are not properly alloted to interfaces")211            sys.exit(0)212    else:213        pprint('[+]Error Occurred: one or two interfaces is not present')214        sys.exit(0)215    pprint("tap_ip->"+tap_ip)216    pprint("wlan_ip->"+wlan_ip)217    # used for local packet from tap218    pprint("[+]Starting UDP listening server on port 8808 (Listening on tap interface)")219    t_local = Thread(target=listen, args=(startUDPserver(8808, ipv4=options.ipv4), 'local'))220    t_local.daemon = True221    t_local.start()222    # used for remote packet from wifi223    pprint("[+]Starting UDP listening server on port 8809 (Listening on wireless interface)")224    t_remote = Thread(target=listen, args=(startUDPserver(8809, ipv4=True), 'remote'))225    t_remote.daemon = True226    t_remote.start()227    pprint("\n\nStart riot instance!!!\n\n")228    # keeping the main thread running229    while True:...main.py
Source:main.py  
1#!/usr/bin/env python2import time3import _thread4import pycom5import machine6from machine import RTC7from network import WLAN8from mqtt import MQTTClient9from network import Bluetooth10# Sensor import11from pycoproc_2 import Pycoproc12from LIS2HH12 import LIS2HH1213from SI7006A20 import SI7006A2014from LTR329ALS01 import LTR329ALS0115from MPL3115A2 import MPL3115A2,ALTITUDE,PRESSURE16# Configuration17import config18# Globals19py = Pycoproc()20# Returns height in meters. Mode may also be set to PRESSURE, returning a value in Pascals21mp = MPL3115A2(py,mode=ALTITUDE)22si = SI7006A20(py)23lt = LTR329ALS01(py)24li = LIS2HH12(py)25# Returns pressure in Pa. Mode may also be set to ALTITUDE, returning a value in meters26mpp = MPL3115A2(py,mode=PRESSURE)27wlan = WLAN()28bt = Bluetooth()29def init():30    """ Init function """31    pycom.heartbeat(False)32    # Turn off BlueTooth to save energy33    bt.deinit()34    print("Connect to WiFi")35    wlan.deinit()36    time.sleep(1)37    wlan.init(mode=WLAN.STA)38    time.sleep(1)39    wlan.ifconfig(id=0, config='dhcp')40    time.sleep(1)41    wlan.connect(ssid=config.WIFI_SSID, auth=(WLAN.WPA2, config.WIFI_PASS))42    pycom.rgbled(0x200000)43    while not wlan.isconnected():44        pycom.rgbled(0x205020)45        time.sleep(1)46        pycom.rgbled(0x502050)47        time.sleep(2)48    print("WiFi connected succesfully")49    wlan_connected = wlan.ifconfig()50    wlan_ip = wlan_connected[0]51    print('wlan connected '+str(wlan_connected))52    pycom.rgbled(0x00A000)53def clock_sync():54    """ Sync RTC time with NTP server for correct timestamps on Events """55    rtc = RTC()56    rtc.ntp_sync(server="pool.ntp.org")57    print('The RTC is synced via NTP')58def mqtt_callback(topic, msg):59    """ Mqtt callback """60    timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*time.gmtime()[:6])61    print("{timestamp} - {topic} : {msg}".format(timestamp=timestamp,topic=topic,msg=msg))62def loop(client):63    """ RunLoop """64    # Get interface addr.65    wlan_ip = wlan.ifconfig()[0]66    # Light channels (ch0 ~450nm violet-blue and ch1 ~770nm red67    (data0, data1) = lt.light()68    measurement = config.THING_ID69    id = machine.unique_id()70    unit="fipy/{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(id[0], id[1], id[2], id[3], id[4], id[5])71    humidity = str(si.humidity())72    dew = str(si.dew_point())73    temp_mp = str(mp.temperature())74    temp_si = str(si.temperature())75    battery = str(py.read_battery_voltage())76    pressure = str(mpp.pressure())77    light_450 = str(data0)78    light_770 = str(data1)79    lux = str(lt.lux())80    timestamp = str(time.time())81    # InfluxDB line-protocol82    #<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]83    fmt = ("{measurement},unit={unit},ip={wlan_ip} "84        "humidity={humidity},dewPoint={dew},tempMp={temp_mp},tempSi={temp_si},battery={battery},"85        "pressure={pressure},light_450={light_450},light_770={light_770},lux={lux}"86        " {timestamp}")87    payload = fmt.format(measurement=measurement,unit=unit,wlan_ip=wlan_ip,humidity=humidity,dew=dew,88    temp_mp=temp_mp,temp_si=temp_si,battery=battery,pressure=pressure,89    light_450=light_450,light_770=light_770,lux=lux,timestamp=timestamp)90    print('Sending data:', payload)91    92    # Publish InfluxDB line-protocol to MQTT93    client.publish(topic=config.MQTT_TOPIC, msg=payload, qos=0)94    pycom.rgbled(0x001000)95    time.sleep(config.SLEEP_TIME)96    pycom.rgbled(0x00A000)97    # Check for MQTT msg98    client.check_msg()99def main():100    """ main app """101    print("Connect to mqtt")102    client = MQTTClient(103        config.THING_ID, config.MQTT_BROKER, 104        port=config.MQTT_PORT, 105        keepalive=int(config.MQTT_KEEPALIVE)106    )107    client.set_callback(mqtt_callback)108    resp = client.connect(clean_session=True)109    print("mqttClient connect call returned: {}".format(resp))110    if (resp == 0):111        print("Successfully connected to MQTT Broker")112    client.subscribe(config.MQTT_TOPIC)113    114    while True:115        try:116            loop(client)117            machine.idle()118        # Handle exception119        except KeyboardInterrupt:120            sys.exit(retval=0)121        except Exception as ex:122            sys.print_exception(ex)123            sys.exit(retval=-1)124# The App125print('Starting App')126init()127clock_sync()...myipcheck.py
Source:myipcheck.py  
1# -*- coding: utf-8 -*-2import socket3import os45def change_IP_in_HTML():6    # input: INPUT_DIR_PATH7    # IP 주ì 구í기, IP 주ìë strííë¡ ëì´ ìì8    # Local IP, gethostname() ì´ë getfqdn() ê°ì9    local_IP = socket.gethostbyname(socket.gethostname())1011    # WLAN IP12    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)13    s.connect(("8.8.8.8", 80))14    WLAN_IP = s.getsockname()[0]15    print("Local IP : " + local_IP)16    print("WLAN IP : " + WLAN_IP)1718    # ----------------------------------------------------------------19    # html íì¼ ê²½ë¡ ì¡ê¸°20    # ----------------------------------------------------------------21    # INPUT_DIR_PATH ==> ex) web\\templates22    # DIR_PATH = os.path.join('web','templates') ë ê°ë¥23    # ----------------------------------------------------------------24    # DIR_PATH = os.path.join(os.getcwd(), INPUT_DIR_PATH)2526    DIR_PATH = os.path.join('web','templates')27    FILE_DIR_NAMES = os.listdir(DIR_PATH)  # audio í´ëìì ìë íì¼/í´ë ì´ë¦ ë¶ë¬ì¤ê¸°28    HTML_NAMES = []                        # HTML íì¼ ì´ë¦ë§ ì ì¥í  리ì¤í¸29    for paths in FILE_DIR_NAMES:30        if paths.find('.html') != -1:31            HTML_NAMES.append(paths)3233    print("[ë°ê¾¼ html íì¼ ë¦¬ì¤í¸]")34    for html in HTML_NAMES:35        print(os.path.join(DIR_PATH, html))36        with open(os.path.join(DIR_PATH, html), 'r', encoding='UTF-8') as html_file:37            lines = html_file.readlines()3839            for i in range(len(lines)):40                ip_pos = lines[i].find('<a href="http://1')41                if ip_pos != -1:42                    ip_pos += 1643                    end_pos = lines[i].find(':', ip_pos)44                    left_substr = lines[i][:ip_pos]45                    right_substr = lines[i][end_pos:]46                    lines[i] = left_substr + WLAN_IP + right_substr  # Local_IP ì WLAN_IP ì¤ ì íí´ì ì¬ì©4748        with open(os.path.join(DIR_PATH, html), 'w', encoding='UTF-8') as html_file:49            # print(lines)50            html_file.writelines(lines)51    52    ##main.jsì ip ìì 53    #print(os.path.join('web\static\js', 'main.js'))54    #with open(os.path.join('web\static\js', 'main.js'), 'r', encoding='UTF-8') as js_file:55    #        lines = js_file.readlines()5657    #        for i in range(len(lines)):58    #            ip_pos = lines[i].find('var myipcheck')59    #            if ip_pos != -1:60    #                ip_pos += 1761    #                end_pos = lines[i].find("'", ip_pos)62    #                end_pos = lines[i].find("'", end_pos)63    #                left_substr = lines[i][:ip_pos]64    #                right_substr = lines[i][end_pos:]65    #                lines[i] = left_substr + WLAN_IP + right_substr  # Local_IP ì WLAN_IP ì¤ ì íí´ì ì¬ì©6667    #with open(os.path.join('web\\static\\js', 'main.js'), 'w', encoding='UTF-8') as js_file:68    #        # print(lines)69    #        js_file.writelines(lines)
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
