Best Python code snippet using avocado_python
bonding.py
Source:bonding.py  
...86        if 'setup' in str(self.name.name):87            for ipaddr, interface in zip(self.ipaddr, self.host_interfaces):88                networkinterface = NetworkInterface(interface, self.localhost)89                try:90                    networkinterface.flush_ipaddr()91                    networkinterface.add_ipaddr(ipaddr, self.netmask)92                    networkinterface.save(ipaddr, self.netmask)93                except Exception:94                    networkinterface.save(ipaddr, self.netmask)95                networkinterface.bring_up()96            for ipaddr, interface in zip(self.peer_first_ipinterface,97                                         self.peer_interfaces):98                if self.peer_bond_needed:99                    self.remotehost = RemoteHost(100                                    self.peer_public_ip,101                                    self.user, password=self.password)102                    peer_networkinterface = NetworkInterface(interface,103                                                             self.remotehost)104                    try:105                        peer_networkinterface.flush_ipaddr()106                        peer_networkinterface.add_ipaddr(ipaddr, self.netmask)107                        peer_networkinterface.save(ipaddr, self.netmask)108                    except Exception:109                        peer_networkinterface.save(ipaddr, self.netmask)110                    networkinterface.bring_up()111        self.miimon = self.params.get("miimon", default="100")112        self.fail_over_mac = self.params.get("fail_over_mac",113                                             default="2")114        self.downdelay = self.params.get("downdelay", default="0")115        self.bond_name = self.params.get("bond_name", default="tempbond")116        self.net_path = "/sys/class/net/"117        self.bond_status = "/proc/net/bonding/%s" % self.bond_name118        self.bond_dir = os.path.join(self.net_path, self.bond_name)119        self.bonding_slave_file = "%s/bonding/slaves" % self.bond_dir120        self.bonding_masters_file = "%s/bonding_masters" % self.net_path121        self.peer_bond_needed = self.params.get("peer_bond_needed",122                                                default=False)123        self.peer_wait_time = self.params.get("peer_wait_time", default=20)124        self.sleep_time = int(self.params.get("sleep_time", default=10))125        self.peer_wait_time = self.params.get("peer_wait_time", default=5)126        self.sleep_time = int(self.params.get("sleep_time", default=5))127        self.mtu = self.params.get("mtu", default=1500)128        for root, dirct, files in os.walk("/root/.ssh"):129            for file in files:130                if file.startswith("avocado-master-"):131                    path = os.path.join(root, file)132                    os.remove(path)133        self.ib = False134        if self.host_interface[0:2] == 'ib':135            self.ib = True136        self.log.info("Bond Test on IB Interface? = %s", self.ib)137        '''138        An individual interface, that has a LACP PF, cannot communicate without139        being bonded. So the test uses the public ip address to create an SSH140        session instead of the private one when setting up a bonding interface.141        '''142        if self.mode == "4" and "setup" in str(self.name.name):143            self.session = Session(self.peer_public_ip, user=self.user,144                                   password=self.password)145        else:146            self.session = Session(self.peer_first_ipinterface[0], user=self.user,147                                   password=self.password)148        if not self.session.connect():149            '''150            LACP bond interface takes some time to get it to ping peer after it151            is setup. This code block tries at most 5 times to get it to connect152            to the peer.153            '''154            if self.mode == "4":155                connect = False156                for _ in range(5):157                    if self.session.connect():158                        connect = True159                        self.log.info("Was able to connect to peer.")160                        break161                    time.sleep(5)162                if not connect:163                    self.cancel("failed connecting to peer")164            else:165                self.cancel("failed connecting to peer")166        self.setup_ip()167        self.err = []168        if self.mode == "4" and "setup" in str(self.name.name):169            self.remotehost = RemoteHost(self.peer_public_ip, self.user,170                                         password=self.password)171        else:172            self.remotehost = RemoteHost(self.peer_first_ipinterface[0], self.user,173                                         password=self.password)174        if 'setup' in str(self.name.name):175            for interface in self.peer_interfaces:176                peer_networkinterface = NetworkInterface(interface, self.remotehost)177                if peer_networkinterface.set_mtu(self.mtu) is not None:178                    self.cancel("Failed to set mtu in peer")179            for host_interface in self.host_interfaces:180                self.networkinterface = NetworkInterface(host_interface, self.localhost)181                if self.networkinterface.set_mtu(self.mtu) is not None:182                    self.cancel("Failed to set mtu in host")183    def bond_ib_conf(self, bond_name, arg1, arg2):184        '''185        configure slaves for IB cards186        '''187        cmd = 'ip link set %s up;' % (bond_name)188        if process.system(cmd, shell=True, ignore_status=True) != 0:189            self.fail("unable to bring Bond interface %s up" % bond_name)190        if arg2 == "ATTACH":191            cmd = 'ifenslave %s %s -f;' % (bond_name, arg1)192        else:193            cmd = 'ifenslave %s -d %s ;' % (bond_name, arg1)194        if process.system(cmd, shell=True, ignore_status=True) != 0:195            self.fail("unable to %s IB interface " % arg2)196    def setup_ip(self):197        '''198        set up the IP config199        '''200        if 'setup' in str(self.name):201            interface = self.host_interfaces[0]202        else:203            interface = self.bond_name204        cmd = "ip addr show  | grep %s" % self.peer_first_ipinterface[0]205        output = self.session.cmd(cmd)206        result = ""207        result = result.join(output.stdout.decode("utf-8"))208        self.peer_first_interface = result.split()[-1]209        if self.peer_first_interface == "":210            self.fail("test failed because peer interface can not retrieved")211        self.peer_ips = [self.peer_first_ipinterface]212        self.local_ip = netifaces.ifaddresses(interface)[2][0]['addr']213        self.net_mask = []214        stf = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)215        for val1, val2 in zip([interface], [self.local_ip]):216            mask = ""217            if val2:218                tmp = fcntl.ioctl(stf.fileno(), 0x891b,219                                  struct.pack('256s', val1.encode()))220                mask = socket.inet_ntoa(tmp[20:24]).strip('\n')221            self.net_mask.append(mask)222        cmd = "route -n | grep %s | grep -w UG | awk "\223              "'{ print $2 }'" % interface224        self.gateway = process.system_output(225            '%s' % cmd, shell=True)226    def bond_remove(self, arg1):227        '''228        bond_remove229        '''230        if arg1 == "local":231            self.log.info("Removing Bonding configuration on local machine")232            self.log.info("------------------------------------------------")233            for ifs in self.host_interfaces:234                cmd = "ip link set %s down" % ifs235                if process.system(cmd, shell=True, ignore_status=True) != 0:236                    self.log.info("unable to bring down the interface")237                if self.ib:238                    self.bond_ib_conf(self.bond_name, ifs, "REMOVE")239                else:240                    genio.write_file(self.bonding_slave_file, "-%s" % ifs)241            genio.write_file(self.bonding_masters_file, "-%s" % self.bond_name)242            self.log.info("Removing bonding module")243            linux_modules.unload_module("bonding")244            time.sleep(self.sleep_time)245        else:246            self.log.info("Removing Bonding configuration on Peer machine")247            self.log.info("------------------------------------------------")248            cmd = ''249            cmd += 'ip link set %s down;' % self.bond_name250            for val in self.peer_interfaces:251                cmd += 'ip link set %s down;' % val252            for val in self.peer_interfaces:253                cmd += 'ip addr flush dev %s;' % val254            for val in self.peer_interfaces:255                if self.ib:256                    self.bond_ib_conf(self.bond_name, val, "REMOVE")257                else:258                    cmd += 'echo "-%s" > %s;' % (val, self.bonding_slave_file)259            cmd += 'echo "-%s" > %s;' % (self.bond_name,260                                         self.bonding_masters_file)261            cmd += 'rmmod bonding;'262            cmd += 'ip addr add %s/%s dev %s;ip link set %s up;sleep 5;'\263                   % (self.peer_first_ipinterface[0], self.net_mask[0],264                      self.peer_interfaces[0], self.peer_interfaces[0])265            output = self.session.cmd(cmd)266            if not output.exit_status == 0:267                self.log.info("bond removing command failed in peer machine")268    def ping_check(self):269        '''270        ping check271        '''272        # need some time for specific interface before ping273        time.sleep(10)274        cmd = "ping -I %s %s -c 5"\275              % (self.bond_name, self.peer_first_ipinterface[0])276        if process.system(cmd, shell=True, ignore_status=True) != 0:277            return False278        return True279    def is_vnic(self):280        '''281        check if slave interface is vnic282        '''283        for interface in self.host_interfaces:284            cmd = "lsdevinfo -q name=%s" % interface285            if 'type="IBM,vnic"' in process.system_output(cmd, shell=True).decode("utf-8"):286                return True287        return False288    def bond_fail(self, arg1):289        '''290        bond fail291        '''292        if len(self.host_interfaces) > 1:293            for interface in self.host_interfaces:294                self.log.info("Failing interface %s for mode %s",295                              interface, arg1)296                cmd = "ip link set %s down" % interface297                if process.system(cmd, shell=True, ignore_status=True) != 0:298                    self.fail("bonding not working when trying to down the\299                               interface %s " % interface)300                time.sleep(self.sleep_time)301                if self.ping_check():302                    self.log.info("Ping passed for Mode %s", arg1)303                else:304                    error_str = "Ping fail in Mode %s when interface %s down"\305                        % (arg1, interface)306                    self.log.debug(error_str)307                    self.err.append(error_str)308                self.log.info(genio.read_file(self.bond_status))309                cmd = "ip link set %s up" % interface310                time.sleep(self.sleep_time)311                if process.system(cmd, shell=True, ignore_status=True) != 0:312                    self.fail("Not able to bring up the slave\313                                    interface %s" % interface)314                time.sleep(self.sleep_time)315        else:316            self.log.debug("Need a min of 2 host interfaces to test\317                         slave failover in Bonding")318        self.log.info("\n----------------------------------------")319        self.log.info("Failing all interfaces for mode %s", arg1)320        self.log.info("----------------------------------------")321        for interface in self.host_interfaces:322            cmd = "ip link set %s down" % interface323            if process.system(cmd, shell=True, ignore_status=True) != 0:324                self.fail("Could not bring down the interface %s " % interface)325            time.sleep(self.sleep_time)326        if not self.ping_check():327            self.log.info("Ping to Bond interface failed. This is expected")328        self.log.info(genio.read_file(self.bond_status))329        for interface in self.host_interfaces:330            cmd = "ip link set %s up" % interface331            time.sleep(self.sleep_time)332            if process.system(cmd, shell=True, ignore_status=True) != 0:333                self.fail("Not able to bring up the slave\334                                interface %s" % interface)335            time.sleep(self.sleep_time)336        bond_mtu = ['2000', '3000', '4000', '5000', '6000', '7000',337                    '8000', '9000']338        if self.is_vnic():339            bond_mtu = ['9000']340        for mtu in bond_mtu:341            self.bond_networkinterface = NetworkInterface(self.bond_name,342                                                          self.localhost)343            if self.bond_networkinterface.set_mtu(mtu) is not None:344                self.cancel("Failed to set mtu in host")345            for interface in self.peer_interfaces:346                peer_networkinterface = NetworkInterface(interface,347                                                         self.remotehost)348                if peer_networkinterface.set_mtu(mtu) is not None:349                    self.cancel("Failed to set mtu in peer")350            if not self.ping_check():351                self.fail("Ping fail in mode %s after MTU change to %s" % (self.mode, mtu))352            else:353                self.log.info("Ping success for mode %s bond with  MTU %s" % (self.mode, mtu))354            if self.bond_networkinterface.set_mtu('1500'):355                self.cancel("Failed to set mtu back to 1500 in host")356            for interface in self.peer_interfaces:357                peer_networkinterface = NetworkInterface(interface,358                                                         self.remotehost)359                if peer_networkinterface.set_mtu('1500') is not None:360                    self.cancel("Failed to set mtu back to 1500 in peer")361    def bond_setup(self, arg1, arg2):362        '''363        bond setup364        '''365        if arg1 == "local":366            self.log.info("Configuring Bonding on Local machine")367            self.log.info("--------------------------------------")368            for ifs in self.host_interfaces:369                cmd = "ip addr flush dev %s" % ifs370                process.system(cmd, shell=True, ignore_status=True)371            for ifs in self.host_interfaces:372                cmd = "ip link set %s down" % ifs373                process.system(cmd, shell=True, ignore_status=True)374            linux_modules.load_module("bonding")375            genio.write_file(self.bonding_masters_file, "+%s" % self.bond_name)376            genio.write_file("%s/bonding/mode" % self.bond_dir, arg2)377            genio.write_file("%s/bonding/miimon" % self.bond_dir,378                             self.miimon)379            genio.write_file("%s/bonding/fail_over_mac" % self.bond_dir,380                             self.fail_over_mac)381            genio.write_file("%s/bonding/downdelay" % self.bond_dir,382                             self.downdelay)383            dict = {'0': ['packets_per_slave', 'resend_igmp'],384                    '1': ['num_unsol_na', 'primary', 'primary_reselect',385                          'resend_igmp'],386                    '2': ['xmit_hash_policy'],387                    '4': ['lacp_rate', 'xmit_hash_policy'],388                    '5': ['tlb_dynamic_lb', 'primary', 'primary_reselect',389                          'resend_igmp', 'xmit_hash_policy', 'lp_interval'],390                    '6': ['primary', 'primary_reselect', 'resend_igmp',391                          'lp_interval']}392            if self.mode in dict.keys():393                for param in dict[self.mode]:394                    param_value = self.params.get(param, default='')395                    if param_value:396                        genio.write_file("%s/bonding/%s"397                                         % (self.bond_dir, param), param_value)398            for val in self.host_interfaces:399                if self.ib:400                    self.bond_ib_conf(self.bond_name, val, "ATTACH")401                else:402                    genio.write_file(self.bonding_slave_file, "+%s" % val)403                time.sleep(2)404            bond_name_val = ''405            for line in genio.read_file(self.bond_status).splitlines():406                if 'Bonding Mode' in line:407                    bond_name_val = line.split(':')[1]408            self.log.info("Trying bond mode %s [ %s ]", arg2, bond_name_val)409            for ifs in self.host_interfaces:410                cmd = "ip link set %s up" % ifs411                if process.system(cmd, shell=True, ignore_status=True) != 0:412                    self.fail("unable to interface up")413            cmd = "ip addr add %s/%s dev %s;ip link set %s up"\414                  % (self.local_ip, self.net_mask[0],415                     self.bond_name, self.bond_name)416            process.system(cmd, shell=True, ignore_status=True)417            for _ in range(0, 600, 60):418                if 'state UP' in process.system_output("ip link \419                     show %s" % self.bond_name, shell=True).decode("utf-8"):420                    self.log.info("Bonding setup is successful on\421                                  local machine")422                    break423                time.sleep(60)424            else:425                self.fail("Bonding setup on local machine has failed")426            if self.gateway:427                cmd = 'ip route add default via %s dev %s' % \428                    (self.gateway, self.bond_name)429                process.system(cmd, shell=True, ignore_status=True)430        else:431            self.log.info("Configuring Bonding on Peer machine")432            self.log.info("------------------------------------------")433            cmd = ''434            for val in self.peer_interfaces:435                cmd += 'ip addr flush dev %s;' % val436            for val in self.peer_interfaces:437                cmd += 'ip link set %s down;' % val438            cmd += 'modprobe bonding;'439            cmd += 'echo +%s > %s;'\440                   % (self.bond_name, self.bonding_masters_file)441            cmd += 'echo 0 > %s/bonding/mode;'\442                   % self.bond_dir443            cmd += 'echo 100 > %s/bonding/miimon;'\444                   % self.bond_dir445            cmd += 'echo 2 > %s/bonding/fail_over_mac;'\446                   % self.bond_dir447            for val in self.peer_interfaces:448                if self.ib:449                    self.bond_ib_conf(self.bond_name, val, "ATTACH")450                else:451                    cmd += 'echo "+%s" > %s;' % (val, self.bonding_slave_file)452            for val in self.peer_interfaces:453                cmd += 'ip link set %s up;' % val454            cmd += 'ip addr add %s/%s dev %s;ip link set %s up;sleep 5;'\455                   % (self.peer_first_ipinterface[0], self.net_mask[0],456                      self.bond_name, self.bond_name)457            output = self.session.cmd(cmd)458            if not output.exit_status == 0:459                self.fail("bond setup command failed in peer machine")460    def test_setup(self):461        '''462        bonding the interfaces463        work for multiple interfaces on both host and peer464        '''465        cmd = "[ -d %s ]" % self.bond_dir466        output = self.session.cmd(cmd)467        if output.exit_status == 0:468            self.fail("bond name already exists on peer machine")469        if os.path.isdir(self.bond_dir):470            self.fail("bond name already exists on local machine")471        if self.peer_bond_needed:472            self.bond_setup("peer", "")473        self.bond_setup("local", self.mode)474        self.log.info(genio.read_file(self.bond_status))475        self.ping_check()476        self.error_check()477    def test_run(self):478        self.bond_fail(self.mode)479        self.log.info("Mode %s OK", self.mode)480        self.error_check()481        # need few sec for interface to not lost the connection to peer482        time.sleep(5)483    def test_cleanup(self):484        '''485        clean up the interface config486        '''487        self.bond_remove("local")488        if self.gateway:489            cmd = 'ip route add default via %s' % \490                (self.gateway)491            process.system(cmd, shell=True, ignore_status=True)492        for ipaddr, host_interface in zip(self.ipaddr, self.host_interfaces):493            networkinterface = NetworkInterface(host_interface, self.localhost)494            try:495                networkinterface.flush_ipaddr()496                networkinterface.add_ipaddr(ipaddr, self.netmask)497                networkinterface.bring_up()498            except Exception:499                self.fail("Interface is taking long time to link up")500            if networkinterface.set_mtu("1500") is not None:501                self.cancel("Failed to set mtu in host")502            try:503                networkinterface.restore_from_backup()504            except Exception:505                self.log.info("backup file not availbale, could not restore file.")506        if self.peer_bond_needed:507            self.bond_remove("peer")508            for ipaddr, interface in zip(self.peer_first_ipinterface,509                                         self.peer_interfaces):510                self.remotehost = RemoteHost(511                                self.peer_public_ip, self.user,512                                password=self.password)513                peer_networkinterface = NetworkInterface(interface,514                                                         self.remotehost)515                try:516                    peer_networkinterface.flush_ipaddr()517                    peer_networkinterface.add_ipaddr(ipaddr, self.netmask)518                    peer_networkinterface.bring_up()519                except Exception:520                    peer_networkinterface.save(ipaddr, self.netmask)521                time.sleep(self.sleep_time)522        self.error_check()523        524        detected_distro = distro.detect()525        if detected_distro.name == "rhel":526            cmd = "systemctl restart NetworkManager.service"527        elif detected_distro.name == "Ubuntu":528            cmd = "systemctl restart networking"529        else:530            cmd = "systemctl restart network"...interfaces.py
Source:interfaces.py  
...392            run_command(cmd, self.host, sudo=True)393        except Exception as ex:394            msg = f"Failed to remove ipaddr. {ex}"395            raise NWException(msg)396    def flush_ipaddr(self):397        """Flush all the IP address for this interface.398        This method will try to flush the ip address from this interface399        and if fails it will raise a NWException. Be careful, you can400        lost connection.401        You must have sudo permissions to run this method on a host.402        """403        cmd = f"ip addr flush dev {self.name}"404        try:405            run_command(cmd, self.host, sudo=True)406        except Exception as ex:407            msg = f"Failed to flush ipaddr. {ex}"408            raise NWException(msg)409    def remove_link(self):410        """Deletes virtual interface link....net-cfg.py
Source:net-cfg.py  
1import shlex,os,json,subprocess2OS_RELEASE_PATH = "/etc/os-release"3NETWORK_JSON_PATH = "/root/network.json"4RESOLV_BASE= "/etc/resolvconf/resolv.conf.d/base"5VERSION = 1.186class OS_RELEASE:7    def __init__(self):8        self.vars = {}9        if os.path.isfile(OS_RELEASE_PATH):10            with open(OS_RELEASE_PATH) as inf:11                for line in inf:12                    name, var = line.partition("=")[::2]13                    self.vars[name.strip()] = var14        return15    def name(self):16        if self.vars.has_key('NAME'):17            return self.vars['NAME']18        else:19            return None20    def version(self):21        if self.vars.has_key('VERSION'):22            return self.vars['VERSION']23        else:24            return ''25    def id(self):26        if self.vars.has_key('ID'):27            return self.vars['ID']28        else:29            return ''30    def isUbuntu(self):31        if self.id().lower().strip()=='ubuntu':32            return True33        return False34    def isRedhat(self):35        if self.id().lower().strip()=='"centos"':36            return True37        return False38    def get(self,key):39        if self.vars.has_key(key):40            return self.vars[key]41        else:42            return ''43class NET:44    def __init__(self,cfg):45        self.cfg = cfg46        self.__name = ''47        self.__iname = ''48        return49    def write_dns(self, dns):50        is_file = os.path.isfile(RESOLV_BASE)51        if not is_file:52            os.system('mkdir -p '+ RESOLV_BASE)53        with open(RESOLV_BASE, "a+") as outf:54            outf.write('nameserver ' + dns + '\n')55        os.system('resolvconf -u')56    def filename(self):57        if self.name() == '':58            return ''59        ret = 'ifcfg-'+self.name()60        if self.cfg['type'] == 'vlan':61            ret += '.'+self.cfg['vlan_id']62        return ret63    def name(self):64        if self.__name == '':65            if self.cfg.has_key('name'):66                self.__name = self.cfg['name']67            elif self.cfg.has_key('mac'):68                self.__name = get_iface_name(self.cfg['mac'])69            else:70                self.__name = ''71        return self.__name72    def iname(self):73        if self.__iname == '':74            if self.cfg['type'] == 'eth':75                self.__iname = self.name()76            elif self.cfg['type'] == 'vlan':77                self.__iname = self.name()+'.'+self.cfg['vlan_id']78            elif self.cfg['type'] == 'bond':79                self.__iname = self.name()80            else:81                self.__iname = self.name()82        return self.__iname83    def get_script(self):84        ret = ''85        cfg = self.cfg86#        if cfg['type'] == 'eth':87        if 'eth' in self.name():88            ret += 'DEVICE='+self.name()+'\n'89            if cfg['mode'].lower().strip() == 'static':90                ret += 'BOOTPROTO=none'+'\n'91                if cfg.has_key('address'):92                    ret += 'IPADDR='+cfg['address']+'\n'93                if cfg.has_key('netmask'):94                    ret += 'NETMASK='+cfg['netmask']+'\n'95                if cfg.has_key('gateway'):96                    ret += 'GATEWAY='+cfg['gateway']+'\n'                    97                if cfg.has_key('metric'):98                    ret += 'METRIC '+cfg['metric']+'\n'                                    99                if cfg.has_key('dns1'):100                    ret += 'DNS1='+cfg['dns1']+'\n'                101                if cfg.has_key('dns2'):102                    ret += 'DNS2='+cfg['dns2']+'\n'                103                if cfg.has_key('dns3'):104                    ret += 'DNS3='+cfg['dns3']+'\n'                                105                    106                ret += 'USERCTL=no'+'\n'107            elif cfg['mode'].lower().strip() == 'dhcp':108                ret += 'BOOTPROTO=dhcp'+'\n'109                if cfg.has_key('metric'):110                    ret += 'METRIC '+cfg['metric']+'\n'                                    111            ret += 'ONBOOT=yes'+'\n'112        elif cfg['type'] == 'vlan':113#        elif 'vlan' in cfg['type']:114            ret += 'DEVICE='+self.name()+'.'+self.cfg['vlan_id']+'\n'115            if cfg['mode'].lower().strip() == 'static':116                ret += 'BOOTPROTO=none'+'\n'117                if cfg.has_key('address'):118                    ret += 'IPADDR='+cfg['address']+'\n'119                if cfg.has_key('netmask'):120                    ret += 'NETMASK='+cfg['netmask']+'\n'121                if cfg.has_key('gateway'):122                    ret += 'GATEWAY='+cfg['gateway']+'\n'                    123                if cfg.has_key('metric'):124                    ret += 'METRIC '+cfg['metric']+'\n'                                    125                ret += 'USERCTL=no'+'\n'126            elif cfg['mode'].lower().strip() == 'dhcp':127                ret += 'BOOTPROTO=dhcp'+'\n'128                if cfg.has_key('metric'):129                    ret += 'METRIC '+cfg['metric']+'\n'                130            ret += 'ONBOOT=yes'+'\n'131            ret += 'VLAN=yes'+'\n'132        elif cfg['type'] == 'bond':133#        elif 'bond' in self.name():134            ret += 'DEVICE='+self.name()+'\n'135            ret += 'TYPE=Bond'+'\n'136            ret += 'BONDING_MASTER=yes'+'\n'137            if cfg['mode'].lower().strip() == 'static':138                ret += 'BOOTPROTO=none'+'\n'139                if cfg.has_key('address'):140                    ret += 'IPADDR='+cfg['address']+'\n'141                if cfg.has_key('netmask'):142                    ret += 'NETMASK='+cfg['netmask']+'\n'143                if cfg.has_key('gateway'):144                    ret += 'GATEWAY='+cfg['gateway']+'\n'                    145                if cfg.has_key('metric'):146                    ret += 'METRIC '+cfg['metric']+'\n'                                    147                ret += 'USERCTL=no'+'\n'148            elif cfg['mode'].lower().strip() == 'dhcp':149                ret += 'BOOTPROTO=dhcp'+'\n'150                if cfg.has_key('metric'):151                    ret += 'METRIC '+cfg['metric']+'\n'                                    152            ret += 'ONBOOT=yes'+'\n'153            ret += 'BONDING_OPTS=""'+'\n'154            ret += 'NM_CONTROLLED="no"'+'\n'155        elif 'ens' in self.name():156# TODO enpsxxx157#        elif cfg['type'] == 'ens':158            ret += 'DEVICE='+self.name()+'\n'159            if cfg['mode'].lower().strip() == 'static':160                ret += 'BOOTPROTO=none'+'\n'161                if cfg.has_key('address'):162                    ret += 'IPADDR='+cfg['address']+'\n'163                if cfg.has_key('netmask'):164                    ret += 'NETMASK='+cfg['netmask']+'\n'165                if cfg.has_key('gateway'):166                    ret += 'GATEWAY='+cfg['gateway']+'\n'167                if cfg.has_key('metric'):168                    ret += 'METRIC '+cfg['metric']+'\n'169                if cfg.has_key('dns1'):170                    ret += 'DNS1='+cfg['dns1']+'\n'171                if cfg.has_key('dns2'):172                    ret += 'DNS2='+cfg['dns2']+'\n'173                if cfg.has_key('dns3'):174                    ret += 'DNS3='+cfg['dns3']+'\n'175                ret += 'USERCTL=no'+'\n'176        return ret177    def get_iface(self):178        ret = ''179        cfg = self.cfg180        if 'eth' in self.name():181            self.cfg['type'] = 'eth'182            ret += 'auto '+self.name()+'\n'183            ret += 'iface '+self.name()+' inet '+cfg['mode']+'\n'184            if cfg.has_key('address'):185                ret += 'address '+cfg['address']+'\n'186            if cfg.has_key('netmask'):187                ret += 'netmask '+cfg['netmask']+'\n'188            if cfg.has_key('gateway'):189                ret += 'gateway '+cfg['gateway']+'\n'                190            if cfg.has_key('metric'):191                ret += 'metric '+cfg['metric']+'\n'                192            if cfg.has_key('dns1'):193                ret += 'dns-nameservers '+cfg['dns1']+'\n'194                self.write_dns(cfg['dns1']);                195            if cfg.has_key('dns2'):196                ret += 'dns-nameservers '+cfg['dns2']+'\n'197                self.write_dns(cfg['dns2']);            198            if cfg.has_key('dns3'):199                ret += 'dns-nameservers '+cfg['dns3']+'\n'                                200                self.write_dns(cfg['dns3']);201            if cfg.has_key('bond-master'):202                ret += 'bond-master '+cfg['bond-master']+'\n'                203        if 'vlan' in cfg['type']:204            self.cfg['type'] = 'vlan'205            ifname =self.name()+'.'+cfg['vlan_id']206            ret += 'auto '+ifname+'\n'207            ret += 'iface '+ifname+' inet '+cfg['mode']+'\n'208            if cfg.has_key('address'):209                ret += 'address '+cfg['address']+'\n'210            if cfg.has_key('netmask'):211                ret += 'netmask '+cfg['netmask']+'\n'212            if cfg.has_key('gateway'):213                ret += 'gateway '+cfg['gateway']+'\n'214            if cfg.has_key('metric'):215                ret += 'metric '+cfg['metric']+'\n'                216            ret += 'vlan-raw-device '+self.name()+'\n'217        if 'bond' in self.name():218            self.cfg['type'] = 'bond'219            ret += 'auto '+self.name()+'\n'220            ret += 'iface '+self.name()+' inet '+cfg['mode']+'\n'221            if cfg.has_key('address'):222                ret += 'address '+cfg['address']+'\n'223            if cfg.has_key('netmask'):224                ret += 'netmask '+cfg['netmask']+'\n'225            if cfg.has_key('gateway'):226                ret += 'gateway '+cfg['gateway']+'\n'227            if cfg.has_key('metric'):228                ret += 'metric '+cfg['metric']+'\n'                                229            if cfg.has_key('bond-mode'):230                ret += 'bond-mode '+cfg['bond-mode']+'\n'231            if cfg.has_key('bond-miimon'):232                ret += 'bond-miimon '+cfg['bond-miimon']+'\n'233            if cfg.has_key('bond-slaves'):234                ret += 'bond-slaves '+cfg['bond-slaves']+'\n'235        elif cfg['type'] != 'vlan':236            237            ret += 'auto '+self.name()+'\n'238            ret += 'iface '+self.name()+' inet '+cfg['mode']+'\n'239            if cfg.has_key('address'):240                ret += 'address '+cfg['address']+'\n'241            if cfg.has_key('netmask'):242                ret += 'netmask '+cfg['netmask']+'\n'243            if cfg.has_key('gateway'):244                ret += 'gateway '+cfg['gateway']+'\n'245            if cfg.has_key('metric'):246                ret += 'metric '+cfg['metric']+'\n'247            if cfg.has_key('dns1'):248                ret += 'dns-nameservers '+cfg['dns1']+'\n'249                self.write_dns(cfg['dns1']);250            if cfg.has_key('dns2'):251                ret += 'dns-nameservers '+cfg['dns2']+'\n'252                self.write_dns(cfg['dns2']);253            if cfg.has_key('dns3'):254                ret += 'dns-nameservers '+cfg['dns3']+'\n'255                self.write_dns(cfg['dns3']);256            if cfg.has_key('bond-master'):257                ret += 'bond-master '+cfg['bond-master']+'\n'258        return ret259class NET_CFG:260    def __init__(self,cfgfile):261        self.cfgfile = cfgfile262        self.osr = OS_RELEASE()263        return264    def backup(self):265        os.system('mkdir -p ./backup')266        if self.osr.isUbuntu():267            if os.path.isfile('/etc/network/interfaces'):268                os.system('rm -f ./backup/interfaces')269                os.system('cp -f /etc/network/interfaces ./backup/')270        elif self.osr.isRedhat():271            if os.path.isdir('/etc/sysconfig/network-scripts'):272                os.system('rm -fr ./backup/network-scripts')273                os.system('cp -fr /etc/sysconfig/network-scripts/ ./backup/')274        return275    def apply(self):276        if self.osr.isUbuntu():277            intfs,cfgfiles = self.toInterfaces()278        elif self.osr.isRedhat():279            intfs,cfgfiles = self.toScripts()280        for cfgfile in cfgfiles:281            print cfgfile['filename']+'\n'282            print cfgfile['path']+'\n'283            print cfgfile['data']+'\n'284            os.system('mkdir -p '+cfgfile['path'])285            with open(cfgfile['path']+cfgfile['filename'], "wb") as outf:286                outf.write(cfgfile['data'])287        if self.osr.isUbuntu():288            os.system('modprobe 8021q')289            for intf in intfs:290                #flush the original ip in dev291                self.flush_dev(intf)292                os.system('ifdown '+intf)293                os.system('ifup '+intf)294        elif self.osr.isRedhat():295            os.system('modprobe 8021q')296            for intf in intfs:297                os.system('ifdown '+intf)298                os.system('ifup '+intf)299            #os.system('systemctl restart network')300        return301    def parse(self):302        self.configures = {}303        try:304            if os.path.isfile(self.cfgfile):305                with open(self.cfgfile, 'r') as inf:306                    self.configures = json.load(inf)['network']307            else:308                return False309        except Exception as e:310            print "error: " 311            print e312            return False313        return True314    def toInterfaces(self):315        ret = []316        networks = []317        intfs = []318        for cfg in self.configures:319            net = NET(cfg)320            networks.append(net)321        data = ''322        for net in networks:323            data += net.get_iface()+'\n'324            intfs.append(net.iname())325        ret.append({'filename':'interfaces','path':'/etc/network/','data':data})326        return intfs,ret327    def toScripts(self):328        ret = []329        networks = []330        intfs = []331        for cfg in self.configures:332            net = NET(cfg)333            networks.append(net)334        for net in networks:335            ret.append({'filename':net.filename(),'path':'/etc/sysconfig/network-scripts/','data':net.get_script()})336            intfs.append(net.iname())337            print net.filename()338            print net.get_script()339        return intfs,ret340    def show(self):341        return342    def flush_dev(self, if_name):343        flush_ipaddr = 'ip addr flush dev ' + if_name + ' ;'344        os.system(flush_ipaddr);345def get_iface_name(mac):346    cmd = shlex.split('find /sys/class/net -mindepth 1 -maxdepth 1 ! -name lo -printf "%P= " -execdir cat {}/address \;')347    process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)348    out, err = process.communicate()349    for line in out.splitlines():350        name, addr = line.partition("=")[::2]351        if mac.lower() == addr.lower().strip():352            return name.lower().strip()353    return ''354if __name__ == "__main__":355    netcfg = NET_CFG(NETWORK_JSON_PATH)356    if netcfg.parse():357        netcfg.backup()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
