Best Python code snippet using lisa_python
get_interfaces.py
Source:get_interfaces.py  
...56                if "local-address" in r1:57                    tun["local_address"] = r1["local-address"]58                tun["remote_address"] = r1["remote-address"]59                return60    def get_mtu(self, r):61        if is_int(r.get("mtu")):62            return int(r["mtu"])63        if is_int(r.get("actual-mtu")):64            return int(r["actual-mtu"])65        return None66    def execute_cli(self):67        ifaces = {}68        misc = {}69        # Get ifIndex70        n_ifindex = {}  # number -> ifIndex71        for n, f, r in self.cli_detail("/interface print oid without-paging"):72            n_ifindex[n] = int(r["name"].rsplit(".", 1)[-1])73        time.sleep(1)74        # Fill interfaces75        a = self.cli_detail("/interface print detail without-paging")76        for n, f, r in a:77            if r["type"] in self.ignored_types:78                continue79            if not r["type"] in "vlan":  # TODO: Check other types80                ifaces[r["name"]] = {81                    "name": r["name"],82                    "type": self.type_map[r["type"]],83                    "admin_status": "X" not in f,84                    "oper_status": "R" in f,85                    "enabled_protocols": [],86                    "subinterfaces": [],87                }88                if "mac-address" in r:89                    ifaces[r["name"]]["mac"] = r["mac-address"]90                if "mac" in r:91                    ifaces[r["name"]]["mac"] = r["mac"]92                misc[r["name"]] = {"type": r["type"]}93                if n in n_ifindex:94                    ifaces[r["name"]]["snmp_ifindex"] = n_ifindex[n]95                if "default-name" in r:96                    ifaces[r["name"]]["default_name"] = r["default-name"]97                if (98                    r["type"].startswith("ipip-")99                    or r["type"].startswith("eoip-")100                    or r["type"].startswith("gre-")101                ):102                    self.si = {103                        "name": r["name"],104                        "admin_status": "X" not in f,105                        "oper_status": "R" in f,106                        "enabled_afi": ["IPv4"],107                        "enabled_protocols": [],108                    }109                    if self.get_mtu(r) is not None:110                        self.si["mtu"] = self.get_mtu(r)111                    if r["type"].startswith("ipip-"):112                        self.get_tunnel("IPIP", "R", "IPv4", ifaces)113                    if r["type"].startswith("eoip-"):114                        self.get_tunnel("EOIP", "R", "IPv4", ifaces)115                    if r["type"].startswith("gre-"):116                        self.get_tunnel("GRE", "R", "IPv4", ifaces)117                    ifaces[r["name"]]["subinterfaces"] += [self.si]118        time.sleep(1)119        # Attach `vlan` subinterfaces to parent120        for n, f, r in self.cli_detail("/interface vlan print detail without-paging"):121            if r["interface"] in ifaces:122                i = ifaces[r["interface"]]123                self.si = {124                    "name": r["name"],125                    "mac": r.get("mac-address") or r.get("mac"),126                    "admin_status": "X" not in f,127                    "oper_status": "R" in f,128                    "enabled_afi": [],129                    "vlan_ids": [int(r["vlan-id"])],130                    "enabled_protocols": [],131                }132                if self.get_mtu(r) is not None:133                    self.si["mtu"] = self.get_mtu(r)134                i["subinterfaces"] += [self.si]135        # process internal `switch` ports and vlans136        vlan_tags = {}137        # "RB532", "x86", CCR1009 not support internal switch port138        try:139            v = self.cli_detail("/interface ethernet switch port print detail without-paging")140            for n, f, r in v:141                if "vlan-mode" not in r:142                    continue143                if (144                    r["vlan-mode"] in ["check", "secure"]145                    and r["vlan-header"] in ["add-if-missing", "leave-as-is"]146                    and r["default-vlan-id"] != "auto"147                    and int(r["default-vlan-id"]) != 0148                ):149                    vlan_tags[r["name"]] = True150                else:151                    vlan_tags[r["name"]] = False152        except self.CLISyntaxError:153            pass154        # "RB532", "x86", CCR1009 not support internal switch port155        try:156            # Attach subinterfaces with `BRIDGE` AFI to parent157            v = self.cli_detail(158                "/interface ethernet switch vlan print detail without-paging", cached=True159            )160            for n, f, r in v:161                # vlan-id=auto ? Need more testing162                if not is_int(r["vlan-id"]):163                    continue164                vlan_id = int(r["vlan-id"])165                ports = r["ports"].split(",")166                if not ports:167                    continue168                for p in ports:169                    if p not in ifaces:170                        continue171                    i = ifaces[p]172                    self.si = {173                        "name": p,174                        "mac": i.get("mac"),175                        "admin_status": i.get("admin_status"),176                        "oper_status": i.get("oper_status"),177                        "enabled_afi": ["BRIDGE"],178                        "enabled_protocols": [],179                        "tagged_vlans": [],180                    }181                    if self.get_mtu(i) is not None:182                        self.si["mtu"] = self.get_mtu(i)183                    if p in vlan_tags:184                        if vlan_tags[p]:185                            self.si["tagged_vlans"] += [vlan_id]186                        else:187                            self.si["untagged_vlan"] = vlan_id if vlan_id > 0 else 1188                    # Try to find in already created subinterfaces189                    found = False190                    for sub in i["subinterfaces"]:191                        if sub["name"] == p:192                            if p in vlan_tags:193                                if vlan_tags[p]:194                                    sub["tagged_vlans"] += [vlan_id]195                                else:196                                    sub["utagged_vlan"] = vlan_id197                                found = True198                    if not found:199                        i["subinterfaces"] += [self.si]200        except self.CLISyntaxError:201            pass202        # Vlans on bridge203        try:204            v = self.cli_detail("/interface bridge vlan print detail without-paging", cached=True)205            for n, f, d in v:206                if "X" in f or "D" in f:207                    continue208                vlans = self.expand_rangelist(d["vlan-ids"])209                for vlan_id in vlans:210                    untagged = d["untagged"].split(",")211                    for p in untagged:212                        if p not in ifaces:213                            continue214                        i = ifaces[p]215                        self.si = {216                            "name": p,217                            "mac": i.get("mac"),218                            "admin_status": i.get("admin_status"),219                            "oper_status": i.get("oper_status"),220                            "enabled_afi": ["BRIDGE"],221                            "enabled_protocols": [],222                            "utagged_vlans": vlan_id,223                        }224                        if self.get_mtu(i) is not None:225                            self.si["mtu"] = self.get_mtu(i)226                        # Try to find in already created subinterfaces227                        for sub in i["subinterfaces"]:228                            if sub["name"] == p:229                                sub["utagged_vlan"] = vlan_id230                                break231                        else:232                            i["subinterfaces"] += [self.si]233                    tagged = d["tagged"].split(",")234                    for p in tagged:235                        if p not in ifaces:236                            continue237                        i = ifaces[p]238                        self.si = {239                            "name": p,240                            "mac": i.get("mac"),241                            "admin_status": i.get("admin_status"),242                            "oper_status": i.get("oper_status"),243                            "enabled_afi": ["BRIDGE"],244                            "enabled_protocols": [],245                            "tagged_vlans": [vlan_id],246                        }247                        if self.get_mtu(i) is not None:248                            self.si["mtu"] = self.get_mtu(i)249                        # Try to find in already created subinterfaces250                        for sub in i["subinterfaces"]:251                            if sub["name"] == p:252                                if "tagged_vlans" in sub and vlan_id not in sub["tagged_vlans"]:253                                    sub["tagged_vlans"] += [vlan_id]254                                else:255                                    sub["tagged_vlans"] = [vlan_id]256                                break257                        else:258                            i["subinterfaces"] += [self.si]259        except self.CLISyntaxError:260            pass261        # Refine ip addresses262        for n, f, r in self.cli_detail("/ip address print detail without-paging"):263            if "X" in f:264                continue265            self.si = {}266            if r["interface"] in ifaces:267                i = ifaces[r["interface"]]268                t = misc[r["interface"]]269                if not i["subinterfaces"]:270                    self.si = {271                        "name": r["interface"],272                        "enabled_afi": [],273                        # XXX Workaround274                        "admin_status": i["admin_status"],275                        "oper_status": i["oper_status"],276                        "enabled_protocols": [],277                    }278                    if "mac" in i:279                        self.si["mac"] = i["mac"]280                    i["subinterfaces"] += [self.si]281                else:282                    for sub in i["subinterfaces"]:283                        if sub["name"] == r["interface"]:284                            self.logger.debug(285                                "\nError: subinterfaces already exists in interface \n%s\n" % i286                            )287                            break288                    else:289                        self.si = {290                            "name": r["interface"],291                            "enabled_afi": [],292                            # XXX Workaround293                            "admin_status": i["admin_status"],294                            "oper_status": i["oper_status"],295                            "enabled_protocols": [],296                        }297                        if "mac" in i:298                            self.si["mac"] = i["mac"]299                        i["subinterfaces"] += [self.si]300            else:301                for i in ifaces:302                    iface = ifaces[i]303                    for s in iface["subinterfaces"]:304                        if s["name"] == r["interface"]:305                            self.si = s306                            break307                    if self.si:308                        t = misc[i]309                        break310            if not self.si:311                self.logger.debug("Error: Interface name not found!!!")312                continue313            afi = "IPv6" if ":" in r["address"] else "IPv4"314            if afi not in self.si["enabled_afi"]:315                self.si["enabled_afi"] += [afi]316            if afi == "IPv4":317                a = self.si.get("ipv4_addresses", [])318                a += [r["address"]]319                self.si["ipv4_addresses"] = a320            else:321                a = self.si.get("ipv6_addresses", [])322                a += [r["address"]]323                self.si["ipv6_addresses"] = a324            # Tunnel types325            # XXX /ip address print detail do not print tunnels !!!326            # Need reworks !!!327            if t["type"].startswith("ppp-"):328                self.get_tunnel("PPP", f, afi, r)329            if t["type"].startswith("pppoe-"):330                self.get_tunnel("PPPOE", f, afi, r)331            if t["type"].startswith("ovpn-"):332                self.si["tunnel"] = {}333            if t["type"].startswith("l2tp-"):334                self.get_tunnel("L2TP", f, afi, r)335            if t["type"].startswith("pptp-"):336                self.get_tunnel("PPTP", f, afi, r)337            if t["type"].startswith("ovpn-"):338                self.get_tunnel("PPP", f, afi, r)339            if t["type"].startswith("sstp-"):340                self.get_tunnel("SSTP", f, afi, r)341        # bridge342        for n, f, r in self.cli_detail("/interface bridge print detail without-paging"):343            self.si = {}344            if r["name"] in ifaces:345                i = ifaces[r["name"]]346                if not i["subinterfaces"]:347                    self.si = {348                        "name": r["name"],349                        "enabled_afi": ["BRIDGE"],350                        # XXX Workaround351                        "admin_status": i["admin_status"],352                        "oper_status": i["oper_status"],353                        "mac": r["mac-address"],354                        "enabled_protocols": [],355                    }356                    if self.get_mtu(i) is not None:357                        self.si["mtu"] = self.get_mtu(i)358                    i["subinterfaces"] += [self.si]359                else:360                    i["subinterfaces"][0]["enabled_afi"] += ["BRIDGE"]361                if r["protocol-mode"] in ["stp", "rstp"]:362                    i["enabled_protocols"] += ["STP"]363        # bonding364        for n, f, r in self.cli_detail("/interface bonding print detail without-paging"):365            self.si = {}366            if r["name"] in ifaces:367                i = ifaces[r["name"]]368                if not i["subinterfaces"]:369                    self.si = {370                        "name": r["name"],371                        "enabled_afi": [],372                        # XXX Workaround373                        "admin_status": i["admin_status"],374                        "oper_status": i["oper_status"],375                        "mac": r["mac-address"],376                        "enabled_protocols": [],377                    }378                    if self.get_mtu(i) is not None:379                        self.si["mtu"] = self.get_mtu(i)380                    i["subinterfaces"] += [self.si]381                if r["mode"] in ["802.3ad"]:382                    i["enabled_protocols"] += ["LACP"]383                if r["slaves"]:384                    slaves = r["slaves"].split(",")385                    for s in slaves:386                        if s in ifaces:387                            ifaces[s]["aggregated_interface"] = r["name"]388        # OSPF389        try:390            ospf_result = self.cli_detail("/routing ospf interface print detail without-paging")391            for n, f, r in ospf_result:392                self.si = {}393                if r["interface"] in ifaces:...wifi_tx_udp_file.py
Source:wifi_tx_udp_file.py  
1from scapy.all import IP,UDP,LLC,SNAP,repr_hex2import socket3import time4import sys5import os6#################################################7############ Default FISSURE Header #############8#################################################9def getArguments():10    udp_source_ip = "192.168.1.200"11    udp_dest_ip = "192.168.1.5"12    udp_source_port = "14321"13    udp_dest_port = "55555"14    filepath = ''15    interval = .116    mtu = 100017    wifi_tx_udp_port = 5200118    wifi_tx_ip_address = "127.0.0.1"19    notes = 'Reads in bytes of data and puts it in a UDP payload. Messages are placed in UDP data and sent to a wifi_tx.py UDP port.'20    arg_names = ['udp_source_ip','udp_dest_ip','udp_source_port','udp_dest_port','filepath','interval','mtu','wifi_tx_udp_port','wifi_tx_ip_address','notes']21    arg_values = [udp_source_ip, udp_dest_ip, udp_source_port, udp_dest_port, filepath, interval, mtu, wifi_tx_udp_port, wifi_tx_ip_address, notes]22    return (arg_names,arg_values)23if __name__ == "__main__":24    # Default Values25    get_udp_source_ip = "192.168.1.200"26    get_udp_dest_ip = "192.168.1.5"27    get_udp_source_port = "14321"28    get_udp_dest_port = "55555"29    get_filepath = ''30    get_interval = .131    get_mtu = 100032    get_wifi_tx_udp_port = 5200133    get_wifi_tx_ip_address = "127.0.0.1"34    # Accept Command Line Arguments35    try:36        get_udp_source_ip = sys.argv[1]37        get_udp_dest_ip = sys.argv[2]38        get_udp_source_port = sys.argv[3]39        get_udp_dest_port = sys.argv[4]40        get_filepath = sys.argv[5]41        get_interval = float(sys.argv[6])42        get_mtu = int(sys.argv[7])43        get_wifi_tx_udp_port = int(sys.argv[8])44        get_wifi_tx_ip_address = sys.argv[9]45    except:46        pass47#################################################48    if len(get_filepath) > 0:49        # LLC/SNAP/IP/UDP Message in UDP Data -> "wifi_tx.py" Port50        udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)51        # LLC/SNAP/IP/UDP52        pip_bytes = LLC()/SNAP()/IP()/UDP()53        pip_bytes[IP].src = get_udp_source_ip54        pip_bytes[IP].dst = get_udp_dest_ip55        pip_bytes[IP].id = 0x000056        pip_bytes[IP].flags = 257        pip_bytes[UDP].sport = int(get_udp_source_port)58        pip_bytes[UDP].dport = int(get_udp_dest_port)59        60        number_of_bytes = os.path.getsize(get_filepath)61        file = open(get_filepath,"rb")                          # Open the file    62        63        # Send File Data64        for n in range(0, number_of_bytes, get_mtu):65            file.seek(n)66            transfer_data = file.read(get_mtu)  67            final_bytes = str(pip_bytes/transfer_data)  68            udp_socket.sendto(final_bytes,(get_wifi_tx_ip_address, get_wifi_tx_udp_port))69            time.sleep(get_interval)70        # Send Last File Read71        file_remainder = number_of_bytes % get_mtu72        transfer_data = file.read(file_remainder)73        final_bytes = str(pip_bytes/transfer_data)74        udp_socket.sendto(final_bytes,(get_wifi_tx_ip_address, get_wifi_tx_udp_port))75        file.close()76        print "File Relay Complete"77    78    else:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
