Best Python code snippet using lisa_python
dpdksuite.py
Source:dpdksuite.py  
...135                    "OVS repoted that DPDK EAL failed to initialize."136                ),137            )138        finally:139            ovs.stop_ovs()140    @TestCaseMetadata(141        description="""142           Install and run ci test for NFF-Go on ubuntu143        """,144        priority=4,145        requirement=simple_requirement(146            min_core_count=8,147            min_nic_count=2,148            network_interface=Sriov(),149            unsupported_features=[Gpu, Infiniband],150            supported_features=[IsolatedResource],151        ),152    )153    def verify_dpdk_nff_go(...harmless_manager.py
Source:harmless_manager.py  
1# Copyright 2017 Mark Szalay2import ConfigParser3import napalm4import sys, getopt5import os6import math7import copy8import subprocess9import time10def delete_vlans_from_juniper(conf_str):11    try:12        start_index = conf_str.index("vlans {")13        opening_parenthesis = 014        end_index = 015        for i in range(start_index+7,len(conf_str)):16            if conf_str[i] == "{":17                opening_parenthesis += 118            if conf_str[i] == "}" and opening_parenthesis != 0:19                opening_parenthesis -= 120            elif conf_str[i] == "}" and opening_parenthesis == 0:21                end_index = i+122                break23        if end_index != 0:24            starting_str = conf_str[0:start_index]25            ending_str = conf_str[end_index:]26            conf_str = starting_str+ending_str27    except:28        pass29    while "vlan {" in conf_str:30        start_index = conf_str.index("vlan {")31        opening_parenthesis = 032        for i in range(start_index + 6, len(conf_str)):33            if conf_str[i] == "{":34                opening_parenthesis += 135            elif conf_str[i] == "}" and opening_parenthesis != 0:36                opening_parenthesis -= 137            elif conf_str[i] == "}" and opening_parenthesis == 0:38                end_index = i + 139                break40        starting_str = conf_str[0:start_index]41        ending_str = conf_str[end_index:]42        conf_str = starting_str + ending_str43    while "port-mode trunk;" in conf_str:44        start_index = conf_str.index("port-mode trunk;")45        starting_str = conf_str[0:start_index]46        ending_str = conf_str[start_index+16:]47        conf_str = starting_str + ending_str48    with open('tmp_config.conf', 'w') as f:49        f.write(conf_str)50    lines = open('tmp_config.conf').readlines()51    temp_cfg_name = 'junos_without_vlan.conf'52    open(temp_cfg_name, 'w').writelines(lines[3:])53    os.remove('tmp_config.conf')54    return "junos_without_vlan.conf"55def delete_vlans_from_arista(running_config):56    new_lines = []57    lines = open(running_config).readlines()58    for line in lines:59        if ("vlan" not in line) and ("trunk" not in line):60            if line.find("Vlan") == -1:61                new_lines.append(line)62    new_lines = new_lines[4:]63    temp_cfg_name = 'arista_without_vlan.conf'64    open(temp_cfg_name, 'w').writelines(new_lines)65    return temp_cfg_name66def create_cfgfile_for_juniper(vlan_if, trunks_if,handled_ports_count):67    configuration = "interfaces {"68    vlan_id = 169    vlans = []70    for interface in vlan_if:71        configuration += "\n    " + interface + " {\n        unit 0 {\n            family ethernet-switching {\n                vlan {\n                    members vlan" + str(72            vlan_id) + ";\n                }\n            }\n        }\n    }"73        vlan_entry = {'interface': interface, 'vlan_name': "vlan" + str(vlan_id), 'vlan_id': str(100 + vlan_id)}74        vlans.append(vlan_entry)75        vlan_id += 176    vlans_temp = copy.copy(vlans)77    for trunk in trunks_if:78        configuration += "\n    " + trunk + " {\n        unit 0 {\n            family ethernet-switching {\n                port-mode trunk;\n                vlan {\n                    members "79        configuration += "[ "80        try:81            trunk_vlans = vlans_temp[:int(handled_ports_count)]82            vlans_temp = vlans_temp[int(handled_ports_count):]83        except:84            print "except"85            trunk_vlans = vlans_temp86        for i in trunk_vlans:87            configuration += " " + i['vlan_name'] + " "88        configuration += " ];\n                }\n            }\n        }\n    }\n"89    configuration += "\n}\nvlans {"90    for vlan in vlans:91        configuration += "\n    " + vlan['vlan_name'] + " {\n        vlan-id " + vlan[92            'vlan_id'] + ";\n        interface {\n            " + vlan['interface'] + ".0;\n        }\n    }"93    configuration += "\n}"94    with open("junos_with_vlan.cfg", 'w') as f:95        f.write(configuration)96    return "junos_with_vlan.cfg"97def create_cfgfile_for_arista(vlans_if, trunks_if,handled_ports_count):98    print '\nCreate new configuration file...'99    config_str = ""100    vlan_id = 101101    config_str += "!\nvlan " + str(vlan_id) + "-" + str(vlan_id+len(vlans_if)-1) + "\n!\n"102    for i in vlans_if:103        config_str += "interface " + str(i) + "\n"104        config_str += "    switchport access vlan " + str(vlan_id) + "\n!\n"105        vlan_id += 1106    vlan = 101107    for trunk in trunks_if:108        config_str += "interface " + str(trunk) + "\n"109        config_str += "    switchport trunk allowed vlan " + str(vlan) + "-" + str(int(vlan+handled_ports_count-1)) + "\n"110        config_str += "    switchport mode trunk"  + "\n!\n"111        vlan += handled_ports_count112    conf_name = "arista.conf"113    with open(conf_name, 'w') as f:114        f.write(config_str)115    return conf_name116def reset_device(device,orig_cfg):117    print "Reset HW device to " +orig_cfg + " state"118    try:119        device.load_replace_candidate(filename=orig_cfg)120        device.commit_config()121        os.remove(orig_cfg)122        print "DONE"123    except Exception as e:124        print "ERROR: " + str(e)125        os.remove(orig_cfg)126        sys.exit(1)127def load_driver(config):128    try:129        driver_name = config.get('Hardware device', 'Driver')130        driver = napalm.get_network_driver(driver_name)131    except Exception as e:132        print "ERROR: "+str(e)133        print "ERROR: Probably napalm does not support the given '" + str(driver_name) + "' driver. Check this website: http://napalm.readthedocs.io/en/latest/support/"134        sys.exit(1)135    return driver, driver_name136def delete_vlans(device,driver_name):137    try:138        device_config = device.get_config()139        old_config_name = str(driver_name) + "_" + time.strftime("%Y-%m-%d_%H:%M:%S", time.gmtime()) + "_original.cfg"140        with open(old_config_name, 'w') as f:141            f.write(device_config['running'])142        print "Currently running config was saved. Configuration file name: " + old_config_name143    except Exception as e:144        print "ERROR: "+str(e)145        sys.exit(1)146    # TODO: create for more driver147    if driver_name == "junos":148        temp_cfg_name = delete_vlans_from_juniper(device_config['running'])149    elif driver_name == "eos":150        temp_cfg_name = delete_vlans_from_arista("old_config_name")151    print "Remove already existing vlans..."152    try:153        device.load_replace_candidate(filename=temp_cfg_name)154        device.commit_config()155        os.remove(temp_cfg_name)156    except Exception as e:157        print "ERROR: " + str(e)158        os.remove(temp_cfg_name)159        sys.exit(1)160def upload_new_config(config,driver_name,device,backup_config_name):161    # Information about vlan and trunk ports162    vlan_if = config.get('Hardware device', 'Used_ports_for_vlan').split(',')163    trunks_if = config.get('Hardware device', 'Used_ports_for_trunk').split(',')164    vlan_ports_count = len(vlan_if)165    print "\nUsed vlan ports: " + str(vlan_ports_count)166    trunk_ports_count = len(trunks_if)167    print "Used trunk ports: " + str(trunk_ports_count)168    handled_ports_count = math.ceil(float(vlan_ports_count) / float(trunk_ports_count))169    print "Number of ports for one trunk: " + str(int(handled_ports_count))170    # Create vlan and trunk interfaces171    # TODO: create for more driver172    print "\nCreate new configuration file"173    if driver_name == "junos":174        file_name = create_cfgfile_for_juniper(vlan_if, trunks_if, handled_ports_count)175    elif driver_name == "eos":176        file_name = create_cfgfile_for_arista(vlan_if, trunks_if, handled_ports_count)177    print "Trying to commit..."178    try:179        device.load_merge_candidate(filename=file_name)180        device.commit_config()181    except Exception as e:182        print "ERROR: " + str(e)183        os.remove(file_name)184        reset_device(device, backup_config_name)185        sys.exit(1)186    print "Commit was successfull :)"187    os.remove(file_name)188def connect_to_device(config,driver):189    if (config.get('Hardware device', 'Port') is not ""):190        device = driver(hostname=config.get('Hardware device', 'Host_IP'),191                        username=config.get('Hardware device', 'Username'),192                        password=config.get('Hardware device', 'Password'),193                        optional_args={'port': config.get('Hardware device', 'Port')})194    else:195        device = driver(hostname=config.get('Hardware device', 'Host_IP'),196                        username=config.get('Hardware device', 'Username'),197                        password=config.get('Hardware device', 'Password'))198    print 'Connecting to the Hardware Device...'199    try:200        device.open()201    except Exception as e:202        print "ERROR: " + str(e)203        sys.exit(1)204    return device205def online_mode():206    """Load a config for the hardware device."""207    #TODO208    # Driver:209    driver = napalm.get_network_driver(config.get('Hardware device', 'Driver'))210    # Connection:211    device = driver(hostname=config.get('Hardware device', 'Host_IP'),212                    username=config.get('Hardware device', 'Username'),213                    password=config.get('Hardware device', 'Password'),214                    optional_args={'port': config.get('Hardware device', 'Port')})215    print 'Connecting to the Hardware Device...'216    device.open()217    # Get interfaces218    device_facts = device.get_facts()219    print "Device: "220    print "\t Vendor: " + device_facts['vendor']221    print "\t Model: " + device_facts['model']222    print "\t OS version : " + device_facts['os_version']223    print "\nInterfaces: "224    for interface in device_facts['interface_list']:225        print "\t" + interface226    print "\nWhat ports do you want to use to harmless?"227    vlan_if = []228    input_if = raw_input('')229    while input_if != "":230        vlan_if.append(input_if)231        input_if = raw_input('')232def backup_mode(new_cfg, config):233    """ UpLoad a backup configuration to the hardware device."""234    #Init used driver:235    driver, driver_name = load_driver(config)236    #Connecting to the device:237    device = connect_to_device(config,driver)238    #Reset device with an older cfg239    reset_device(device, new_cfg)240    device.close()241    print 'Reset Hardware Device to earlier state was successfull!'242    print '------------------------------------------------------------------------------------------------------------'243def offline_mode(config):244    """ UpLoad the HARMLESS configuration to the hardware device."""245    #Init used driver:246    driver, driver_name = load_driver(config)247    #Connecting to the device:248    device = connect_to_device(config,driver)249    #Deleting existing vlans250    backup_config_name = delete_vlans(device, driver_name)251    #Creating and uploading new configuration252    upload_new_config(config, driver_name, device, backup_config_name)253    # close the session with the device.254    device.close()255    print 'Configuring Hardware Device was successfull!'256    print '------------------------------------------------------------------------------------------------------------'257def start_virtual_switches(patch_port_num,trunk_port_num, dpdk):258    print "Stop previously started virtual switch if it's alive"259    subprocess.call("./stop_ovs.sh " + str(patch_port_num) + " " + str(trunk_port_num), shell=True,stderr=False)260    print '------------------------------------------------------------------------------------------------------------'261    262    print "Create and start virtual switches"263    264    dpdk=dpdk.lower()265    if dpdk == "false": 266		subprocess.call("./virtual_switch_starter_U16.sh " + str(patch_port_num) + " " + str(trunk_port_num),shell=True,stderr=False)267    else:268        subprocess.call("./virtual_switch_starter_dpdk.sh " + str(patch_port_num) + " " + str(trunk_port_num),shell=True,stderr=False)269    #subprocess.call("./star_virtual_switch.sh " + str(patch_port_num) + " " + str(trunk_port_num), shell=True)270    #TODO: Create a picture about the connections (wiring)271def main(argv):272    running_mode = None273    config_file = None274    try:275        opts, args = getopt.getopt(argv,"", ["help", "configuration-file=","upload-cfg="])276    except getopt.GetoptError:277        print "Bad parameter!\nUse 'python harmless_manager.py --help'"278        sys.exit(1)279    for opt, arg in opts:280        if opt == "--help":281            print "Usage: python harmless_manager.py <params>\nParams:\n\t--configuration-file=<config file>\n\t--upload-cfg=<config_file>"282            sys.exit(1)283        elif opt in ("--configuration-file="):284            config_file = arg285        elif opt in ("--upload-cfg="):286            new_cfg = arg287            running_mode = "backup_device"288        elif opt in ("--online-mode"):289            running_mode = "online"290        else:291            print 'Bad parameters! Use python harmless_manager.py --help'292            sys.exit(1)293    try:294        config = ConfigParser.ConfigParser()295        config.read(config_file)296    except Exception as e:297        print "ERROR: " + str(e)298        print "ERROR: Probably the configuration file name '" + str(299            config_file) + "' is not correct! Please use: python harmless_manager.py --help"300        sys.exit(1)301    if running_mode == "online":302        print "Online mode is not implemented yet."303        sys.exit(1)304    elif running_mode == "backup_device":305        backup_mode(new_cfg, config)306    else:307        print '################################################'308        print "### Configuring Hardware device for HARMLESS ###"309        print '###########################################################################################################'310        offline_mode(config)311        print '###############################################'312        print "### Creating Software switches for HARMLESS ###"313        print '###########################################################################################################'314        patch_port_num = len(config.get('Hardware device', 'Used_ports_for_vlan').split(','))315        trunk_port_num = len(config.get('Hardware device', 'Used_ports_for_trunk').split(','))316        start_virtual_switches(patch_port_num, trunk_port_num,config.get('HARMLESS', 'DPDK'))317        print '###########################################################################################################'318        print "###                  Configuring HW Switch and Creating Software switches: Done                         ###"319        print '###########################################################################################################'320if __name__ == '__main__':...dpdkovs.py
Source:dpdkovs.py  
...220        # set interface UP221        ip = node.tools[Ip]222        ip.up(self.OVS_BRIDGE_NAME)223        self.teardown_state = self.INTERFACE_UP224    def stop_ovs(self) -> None:225        # teardown based on the state that was reached during setup_ovs226        # this allows use in a 'finally' block227        # leave the node in a usable state for the next test.228        node = self.node229        ip = node.tools[Ip]230        modprobe = node.tools[Modprobe]231        if self.teardown_state == self.INTERFACE_UP:232            ip.down(self.OVS_BRIDGE_NAME)233        if self.teardown_state >= self.PORT_ADD:234            node.execute(235                f"ovs-vsctl del-port {self.OVS_BRIDGE_NAME} p1",236                sudo=True,237                expected_exit_code=0,238                expected_exit_code_failure_message=(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
