How to use wlan_ip method in ATX

Best Python code snippet using ATX

tap_wlan_interface.py

Source:tap_wlan_interface.py Github

copy

Full Screen

1import os2import sys3import signal4from optparse import OptionParser5from threading import Thread, current_thread6import netifaces as ni7import socket8# import ipaddress9import time10import subprocess11'''12multicast cases:13 1. packet from tap interface14 2. config number = 1,2,515unicast cases:16 1. packet from tap interface17 2. config number = 0,3,4,6,718'''19multicast_config = ['1','2','5', '17', '18']20unicast_config = ['0','3','4','6','7']21# triggers = []22device_map = {} # ipv6 -> wlan(ipv4)23riot_ip = ''24tap_ip = ''25wlan_ip = ''26blacklist = []27parser = OptionParser()28def pprint(arg):29 name = current_thread().getName()30 if name == 'Thread-1':31 print('\033[94m'+str(arg)+'\033[00m')32 elif name == 'Thread-2':33 print('\033[92m'+str(arg)+'\033[00m')34 else:35 print(str(arg))36def ip_validate(addr, ipv4):37 if ipv4:38 addr = addr.split('.')39 if len(addr) == 4:40 for value in addr:41 if not value.isdigit() or int(value) < 0 or int(value) > 255:42 return False43 else:44 return False45 else:46 if addr == '::1' or addr == '::':47 return True48 addr = addr.split('%')[0].split(':')49 if len(addr) <= 8 and len(addr) >=3:50 for value in addr:51 try:52 v = int(value, 16)53 if v < 0 or v > 65535:54 return False55 except ValueError:56 if value == '':57 continue58 return False59 else:60 return False61 return True62def multicast(data, port=8809, ipv4=True):63 pprint("multicast():")64 pprint("data -> "+data)65 pprint("port -> "+str(port))66 pprint("is_ipv4 -> "+str(ipv4))67 global device_map68 global wlan_ip69 flag = False70 if ipv4:71 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)72 sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)73 sock.bind((wlan_ip, 0))74 else:75 # this part will never be used in our case76 pprint("[-]Anomally detected in multicast function!!!")77 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)78 sock.sendto(data, ('<broadcast>', port))79 sock.close()80 pprint("[+]sent")81 return True82def unicast(data, address, ipv4):83 pprint("unicast():")84 pprint("data -> "+data)85 pprint("address -> "+str(address))86 pprint("is_ipv4 ->"+str(ipv4))87 global parser88 options, args = parser.parse_args()89 command = 'echo'90 ip = address[0]91 if ipv4:92 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)93 if not ip_validate(ip, True):94 pprint("[-]Invalid ip")95 return False96 else:97 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)98 if not ip_validate(ip, False):99 pprint("[-]Invalid ip")100 return False101 sock.sendto(data, address)102 pprint("[+]sent")103 sock.close()104 return True105def createTap(iface):106 pprint("createTap():")107 pprint("iface -> "+iface)108 os.system('sudo ip tuntap add '+iface+' mode tap user ${USER}')109 os.system('sudo ip link set '+iface+' up')110def startUDPserver(port, ipv4):111 pprint("\033[00m")112 pprint("startUDPserver():")113 pprint("port -> "+str(port))114 pprint("is_ipv4 -> "+str(ipv4))115 if ipv4:116 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)117 server_address = ('0.0.0.0', int(port))118 else:119 sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)120 server_address = ("::", int(port))121 sock.bind(server_address)122 return sock123def listen(sock, lis):124 pprint("listen():")125 pprint("sock -> "+str(sock))126 pprint("lis -> "+lis)127 global multicast128 global unicast129 # global triggers130 global device_map131 global riot_ip132 global tap_ip133 global wlan_ip134 while(True):135 data, address = sock.recvfrom(4096)136 pprint("while():")137 pprint("[+]Received {} from {}".format(data, address))138 if data and len(data.split(' '))>=3:139 values = data.split(' ')140 config = values[0]141 ip = values[-2]142 if config in multicast_config and lis=='local' and ip == "::":143 pprint("[+]multicast local")144 riot_ip = address145 pprint("[+]riot_ip-> "+str(riot_ip))146 multicast(data)147 elif config in unicast_config and lis=='local':148 pprint("[+]unicast local")149 try:150 unicast(data, (device_map[ip], 8809), True)151 except Exception, e:152 pprint(e)153 elif lis=='remote':154 # check if it is a broadcast packet from my own ip155 if config in multicast_config and address[0] == wlan_ip:156 pprint("[+]Packet discarded")157 continue158 # end159 pprint("[+]remote")160 remote_ip = values[-1]161 pprint("[+]Source of Data: "+str(remote_ip))162 if ip_validate(remote_ip, True) or ip_validate(remote_ip, False):163 '''164 if remote_ip not in triggers:165 triggers.append(address[0])166 '''167 if remote_ip not in device_map.keys():168 device_map[remote_ip] = address[0]169 else:170 pprint("[-]Invalid ip")171 continue172 pprint("[+]riot_ip-> "+str(riot_ip))173 unicast(data, riot_ip, False)174 else:175 pass176 pprint("[+]Device map:")177 pprint(device_map)178 # pprint("[+]Triggers:")179 # pprint(triggers)180 else:181 pprint("[-]Invalid data format!!!")182if __name__ == "__main__":183 global parser184 parser.add_option("-t", "--tap", dest="tap", help="Specify the name of the tap interface to create")185 parser.add_option("-w", "--wireless", dest="wlan", help="Specify the name of the wireless interface")186 parser.add_option("-p", "--path", dest="path", help="Specify the path of riot application")187 parser.add_option("-4", action="store_true", dest="ipv4", help="Use for ipv4 addressing", default=False) 188 (options, args) = parser.parse_args()189 pprint('[+]Creating tap interface '+options.tap)190 createTap(options.tap)191 pprint('[+]Assigning ip to tap interface')192 if os.getenv('RIOT_APP'):193 command = 'bash -c \"cd '+os.getenv('RIOT_APP')+' && sudo make term\"'194 else:195 command = 'bash -c \"cd '+options.path+' && sudo make term\"'196 p = subprocess.call("timeout 5 xterm -e "+command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)197 pprint('\n\nConnect to wifi and press Enter!!!\n\n')198 raw_input()199 global tap_ip200 global riot_ip201 global wlan_ip202 interfaces = ni.interfaces()203 if options.tap in interfaces and options.wlan in interfaces:204 try:205 tap_ip = ni.ifaddresses(options.tap)[10][0]['addr'].split('%')[0]206 wlan_ip = ni.ifaddresses(options.wlan)[2][0]['addr']207 # riot_ip = str(ipaddress.IPv6Address(unicode(tap_ip))+1)208 except Exception, e:209 pprint(e)210 pprint("[+]IP addresses are not properly alloted to interfaces")211 sys.exit(0)212 else:213 pprint('[+]Error Occurred: one or two interfaces is not present')214 sys.exit(0)215 pprint("tap_ip->"+tap_ip)216 pprint("wlan_ip->"+wlan_ip)217 # used for local packet from tap218 pprint("[+]Starting UDP listening server on port 8808 (Listening on tap interface)")219 t_local = Thread(target=listen, args=(startUDPserver(8808, ipv4=options.ipv4), 'local'))220 t_local.daemon = True221 t_local.start()222 # used for remote packet from wifi223 pprint("[+]Starting UDP listening server on port 8809 (Listening on wireless interface)")224 t_remote = Thread(target=listen, args=(startUDPserver(8809, ipv4=True), 'remote'))225 t_remote.daemon = True226 t_remote.start()227 pprint("\n\nStart riot instance!!!\n\n")228 # keeping the main thread running229 while True:...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1#!/usr/bin/env python2import time3import _thread4import pycom5import machine6from machine import RTC7from network import WLAN8from mqtt import MQTTClient9from network import Bluetooth10# Sensor import11from pycoproc_2 import Pycoproc12from LIS2HH12 import LIS2HH1213from SI7006A20 import SI7006A2014from LTR329ALS01 import LTR329ALS0115from MPL3115A2 import MPL3115A2,ALTITUDE,PRESSURE16# Configuration17import config18# Globals19py = Pycoproc()20# Returns height in meters. Mode may also be set to PRESSURE, returning a value in Pascals21mp = MPL3115A2(py,mode=ALTITUDE)22si = SI7006A20(py)23lt = LTR329ALS01(py)24li = LIS2HH12(py)25# Returns pressure in Pa. Mode may also be set to ALTITUDE, returning a value in meters26mpp = MPL3115A2(py,mode=PRESSURE)27wlan = WLAN()28bt = Bluetooth()29def init():30 """ Init function """31 pycom.heartbeat(False)32 # Turn off BlueTooth to save energy33 bt.deinit()34 print("Connect to WiFi")35 wlan.deinit()36 time.sleep(1)37 wlan.init(mode=WLAN.STA)38 time.sleep(1)39 wlan.ifconfig(id=0, config='dhcp')40 time.sleep(1)41 wlan.connect(ssid=config.WIFI_SSID, auth=(WLAN.WPA2, config.WIFI_PASS))42 pycom.rgbled(0x200000)43 while not wlan.isconnected():44 pycom.rgbled(0x205020)45 time.sleep(1)46 pycom.rgbled(0x502050)47 time.sleep(2)48 print("WiFi connected succesfully")49 wlan_connected = wlan.ifconfig()50 wlan_ip = wlan_connected[0]51 print('wlan connected '+str(wlan_connected))52 pycom.rgbled(0x00A000)53def clock_sync():54 """ Sync RTC time with NTP server for correct timestamps on Events """55 rtc = RTC()56 rtc.ntp_sync(server="pool.ntp.org")57 print('The RTC is synced via NTP')58def mqtt_callback(topic, msg):59 """ Mqtt callback """60 timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(*time.gmtime()[:6])61 print("{timestamp} - {topic} : {msg}".format(timestamp=timestamp,topic=topic,msg=msg))62def loop(client):63 """ RunLoop """64 # Get interface addr.65 wlan_ip = wlan.ifconfig()[0]66 # Light channels (ch0 ~450nm violet-blue and ch1 ~770nm red67 (data0, data1) = lt.light()68 measurement = config.THING_ID69 id = machine.unique_id()70 unit="fipy/{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}".format(id[0], id[1], id[2], id[3], id[4], id[5])71 humidity = str(si.humidity())72 dew = str(si.dew_point())73 temp_mp = str(mp.temperature())74 temp_si = str(si.temperature())75 battery = str(py.read_battery_voltage())76 pressure = str(mpp.pressure())77 light_450 = str(data0)78 light_770 = str(data1)79 lux = str(lt.lux())80 timestamp = str(time.time())81 # InfluxDB line-protocol82 #<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]83 fmt = ("{measurement},unit={unit},ip={wlan_ip} "84 "humidity={humidity},dewPoint={dew},tempMp={temp_mp},tempSi={temp_si},battery={battery},"85 "pressure={pressure},light_450={light_450},light_770={light_770},lux={lux}"86 " {timestamp}")87 payload = fmt.format(measurement=measurement,unit=unit,wlan_ip=wlan_ip,humidity=humidity,dew=dew,88 temp_mp=temp_mp,temp_si=temp_si,battery=battery,pressure=pressure,89 light_450=light_450,light_770=light_770,lux=lux,timestamp=timestamp)90 print('Sending data:', payload)91 92 # Publish InfluxDB line-protocol to MQTT93 client.publish(topic=config.MQTT_TOPIC, msg=payload, qos=0)94 pycom.rgbled(0x001000)95 time.sleep(config.SLEEP_TIME)96 pycom.rgbled(0x00A000)97 # Check for MQTT msg98 client.check_msg()99def main():100 """ main app """101 print("Connect to mqtt")102 client = MQTTClient(103 config.THING_ID, config.MQTT_BROKER, 104 port=config.MQTT_PORT, 105 keepalive=int(config.MQTT_KEEPALIVE)106 )107 client.set_callback(mqtt_callback)108 resp = client.connect(clean_session=True)109 print("mqttClient connect call returned: {}".format(resp))110 if (resp == 0):111 print("Successfully connected to MQTT Broker")112 client.subscribe(config.MQTT_TOPIC)113 114 while True:115 try:116 loop(client)117 machine.idle()118 # Handle exception119 except KeyboardInterrupt:120 sys.exit(retval=0)121 except Exception as ex:122 sys.print_exception(ex)123 sys.exit(retval=-1)124# The App125print('Starting App')126init()127clock_sync()...

Full Screen

Full Screen

myipcheck.py

Source:myipcheck.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import socket3import os45def change_IP_in_HTML():6 # input: INPUT_DIR_PATH7 # IP 주소 구하기, IP 주소는 str형태로 되어 있음8 # Local IP, gethostname() 이랑 getfqdn() 같음9 local_IP = socket.gethostbyname(socket.gethostname())1011 # WLAN IP12 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)13 s.connect(("8.8.8.8", 80))14 WLAN_IP = s.getsockname()[0]15 print("Local IP : " + local_IP)16 print("WLAN IP : " + WLAN_IP)1718 # ----------------------------------------------------------------19 # html 파일 경로 잡기20 # ----------------------------------------------------------------21 # INPUT_DIR_PATH ==> ex) web\\templates22 # DIR_PATH = os.path.join('web','templates') 도 가능23 # ----------------------------------------------------------------24 # DIR_PATH = os.path.join(os.getcwd(), INPUT_DIR_PATH)2526 DIR_PATH = os.path.join('web','templates')27 FILE_DIR_NAMES = os.listdir(DIR_PATH) # audio 폴더안에 있는 파일/폴더 이름 불러오기28 HTML_NAMES = [] # HTML 파일 이름만 저장할 리스트29 for paths in FILE_DIR_NAMES:30 if paths.find('.html') != -1:31 HTML_NAMES.append(paths)3233 print("[바꾼 html 파일 리스트]")34 for html in HTML_NAMES:35 print(os.path.join(DIR_PATH, html))36 with open(os.path.join(DIR_PATH, html), 'r', encoding='UTF-8') as html_file:37 lines = html_file.readlines()3839 for i in range(len(lines)):40 ip_pos = lines[i].find('<a href="http://1')41 if ip_pos != -1:42 ip_pos += 1643 end_pos = lines[i].find(':', ip_pos)44 left_substr = lines[i][:ip_pos]45 right_substr = lines[i][end_pos:]46 lines[i] = left_substr + WLAN_IP + right_substr # Local_IP 와 WLAN_IP 중 선택해서 사용4748 with open(os.path.join(DIR_PATH, html), 'w', encoding='UTF-8') as html_file:49 # print(lines)50 html_file.writelines(lines)51 52 ##main.js의 ip 수정53 #print(os.path.join('web\static\js', 'main.js'))54 #with open(os.path.join('web\static\js', 'main.js'), 'r', encoding='UTF-8') as js_file:55 # lines = js_file.readlines()5657 # for i in range(len(lines)):58 # ip_pos = lines[i].find('var myipcheck')59 # if ip_pos != -1:60 # ip_pos += 1761 # end_pos = lines[i].find("'", ip_pos)62 # end_pos = lines[i].find("'", end_pos)63 # left_substr = lines[i][:ip_pos]64 # right_substr = lines[i][end_pos:]65 # lines[i] = left_substr + WLAN_IP + right_substr # Local_IP 와 WLAN_IP 중 선택해서 사용6667 #with open(os.path.join('web\\static\\js', 'main.js'), 'w', encoding='UTF-8') as js_file:68 # # print(lines)69 # js_file.writelines(lines) ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful