Best Python code snippet using autotest_python
test_ovs.py
Source:test_ovs.py  
...253        exists.return_value = True254        ovs.full_restart()255        service.assert_called_with('start', 'openvswitch-force-reload-kmod')256    @patch('subprocess.check_output')257    def test_port_to_br(self, check_output):258        check_output.return_value = b'br-ex'259        self.assertEqual(ovs.port_to_br('br-lb'),260                         'br-ex')261    @patch('subprocess.check_output')262    def test_port_to_br_not_found(self, check_output):263        check_output.side_effect = subprocess.CalledProcessError(1, 'not found')264        self.assertEqual(ovs.port_to_br('br-lb'), None)265    @patch('subprocess.check_call')266    def test_enable_ipfix_defaults(self, check_call):267        ovs.enable_ipfix('br-int',268                         '10.5.0.10:4739')269        check_call.assert_called_once_with([270            'ovs-vsctl', 'set', 'Bridge', 'br-int', 'ipfix=@i', '--',271            '--id=@i', 'create', 'IPFIX',272            'targets="10.5.0.10:4739"',273            'sampling=64',274            'cache_active_timeout=60',275            'cache_max_flows=128',276        ])277    @patch('subprocess.check_call')278    def test_enable_ipfix_values(self, check_call):...__init__.py
Source:__init__.py  
...79        import netifaces80    # NOTE(jamespage):81    # Older code supported addition of a linuxbridge directly82    # to an OVS bridge; ensure we don't break uses on upgrade83    existing_ovs_bridge = port_to_br(bridge)84    if existing_ovs_bridge is not None:85        log('Linuxbridge {} is already directly in use'86            ' by OVS bridge {}'.format(bridge, existing_ovs_bridge),87            level=INFO)88        return89    # NOTE(jamespage):90    # preserve existing naming because interfaces may already exist.91    ovsbridge_port = "veth-" + name92    linuxbridge_port = "veth-" + bridge93    if (len(ovsbridge_port) > MAX_KERNEL_INTERFACE_NAME_LEN or94            len(linuxbridge_port) > MAX_KERNEL_INTERFACE_NAME_LEN):95        # NOTE(jamespage):96        # use parts of hashed bridgename (openstack style) when97        # a bridge name exceeds 15 chars98        hashed_bridge = hashlib.sha256(bridge.encode('UTF-8')).hexdigest()99        base = '{}-{}'.format(hashed_bridge[:8], hashed_bridge[-2:])100        ovsbridge_port = "cvo{}".format(base)101        linuxbridge_port = "cvb{}".format(base)102    interfaces = netifaces.interfaces()103    for interface in interfaces:104        if interface == ovsbridge_port or interface == linuxbridge_port:105            log('Interface {} already exists'.format(interface), level=INFO)106            return107    log('Adding linuxbridge {} to ovsbridge {}'.format(bridge, name),108        level=INFO)109    check_for_eni_source()110    with open('/etc/network/interfaces.d/{}.cfg'.format(111            linuxbridge_port), 'w') as config:112        config.write(BRIDGE_TEMPLATE.format(linuxbridge_port=linuxbridge_port,113                                            ovsbridge_port=ovsbridge_port,114                                            bridge=bridge))115    subprocess.check_call(["ifup", linuxbridge_port])116    add_bridge_port(name, linuxbridge_port)117def is_linuxbridge_interface(port):118    ''' Check if the interface is a linuxbridge bridge119    :param port: Name of an interface to check whether it is a Linux bridge120    :returns: True if port is a Linux bridge'''121    if os.path.exists('/sys/class/net/' + port + '/bridge'):122        log('Interface {} is a Linux bridge'.format(port), level=DEBUG)123        return True124    else:125        log('Interface {} is not a Linux bridge'.format(port), level=DEBUG)126        return False127def set_manager(manager):128    ''' Set the controller for the local openvswitch '''129    log('Setting manager for local ovs to {}'.format(manager))130    subprocess.check_call(['ovs-vsctl', 'set-manager',131                           'ssl:{}'.format(manager)])132def set_Open_vSwitch_column_value(column_value):133    """134    Calls ovs-vsctl and sets the 'column_value' in the Open_vSwitch table.135    :param column_value:136            See http://www.openvswitch.org//ovs-vswitchd.conf.db.5.pdf for137            details of the relevant values.138    :type str139    :raises CalledProcessException: possibly ovsdb-server is not running140    """141    log('Setting {} in the Open_vSwitch table'.format(column_value))142    subprocess.check_call(['ovs-vsctl', 'set', 'Open_vSwitch', '.', column_value])143CERT_PATH = '/etc/openvswitch/ovsclient-cert.pem'144def get_certificate():145    ''' Read openvswitch certificate from disk '''146    if os.path.exists(CERT_PATH):147        log('Reading ovs certificate from {}'.format(CERT_PATH))148        with open(CERT_PATH, 'r') as cert:149            full_cert = cert.read()150            begin_marker = "-----BEGIN CERTIFICATE-----"151            end_marker = "-----END CERTIFICATE-----"152            begin_index = full_cert.find(begin_marker)153            end_index = full_cert.rfind(end_marker)154            if end_index == -1 or begin_index == -1:155                raise RuntimeError("Certificate does not contain valid begin"156                                   " and end markers.")157            full_cert = full_cert[begin_index:(end_index + len(end_marker))]158            return full_cert159    else:160        log('Certificate not found', level=WARNING)161        return None162def check_for_eni_source():163    ''' Juju removes the source line when setting up interfaces,164    replace if missing '''165    with open('/etc/network/interfaces', 'r') as eni:166        for line in eni:167            if line == 'source /etc/network/interfaces.d/*':168                return169    with open('/etc/network/interfaces', 'a') as eni:170        eni.write('\nsource /etc/network/interfaces.d/*')171def full_restart():172    ''' Full restart and reload of openvswitch '''173    if os.path.exists('/etc/init/openvswitch-force-reload-kmod.conf'):174        service('start', 'openvswitch-force-reload-kmod')175    else:176        service('force-reload-kmod', 'openvswitch-switch')177def enable_ipfix(bridge, target):178    '''Enable IPfix on bridge to target.179    :param bridge: Bridge to monitor180    :param target: IPfix remote endpoint181    '''182    cmd = ['ovs-vsctl', 'set', 'Bridge', bridge, 'ipfix=@i', '--',183           '--id=@i', 'create', 'IPFIX', 'targets="{}"'.format(target)]184    log('Enabling IPfix on {}.'.format(bridge))185    subprocess.check_call(cmd)186def disable_ipfix(bridge):187    '''Diable IPfix on target bridge.188    :param bridge: Bridge to modify189    '''190    cmd = ['ovs-vsctl', 'clear', 'Bridge', bridge, 'ipfix']191    subprocess.check_call(cmd)192def port_to_br(port):193    '''Determine the bridge that contains a port194    :param port: Name of port to check for195    :returns str: OVS bridge containing port or None if not found196    '''197    try:198        return subprocess.check_output(199            ['ovs-vsctl', 'port-to-br', port]200        ).decode('UTF-8').strip()201    except subprocess.CalledProcessError:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
