How to use ovs_vsctl method in autotest

Best Python code snippet using autotest_python

bridge.py

Source:bridge.py Github

copy

Full Screen

1# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.2# Copyright (C) 2012 Isaku Yamahata <yamahata at private email ne jp>3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or13# implied.14# See the License for the specific language governing permissions and15# limitations under the License.16"""17slimmed down version of OVSBridge in quantum agent18"""19import functools20import logging21from ryu import cfg22import ryu.exception as ryu_exc23import ryu.lib.dpid as dpid_lib24import ryu.lib.ovs.vsctl as ovs_vsctl25LOG = logging.getLogger(__name__)26CONF = cfg.CONF27CONF.register_opts([28 cfg.IntOpt('ovsdb-timeout', default=2, help='ovsdb timeout')29])30class OVSBridgeNotFound(ryu_exc.RyuException):31 message = 'no bridge for datapath_id %(datapath_id)s'32class VifPort(object):33 def __init__(self, port_name, ofport, vif_id, vif_mac, switch):34 super(VifPort, self).__init__()35 self.port_name = port_name36 self.ofport = ofport37 self.vif_id = vif_id38 self.vif_mac = vif_mac39 self.switch = switch40 def __str__(self):41 return ('iface-id=%s, '42 'vif_mac=%s, '43 'port_name=%s, '44 'ofport=%d, '45 'bridge_name=%s' % (self.vif_id,46 self.vif_mac,47 self.port_name,48 self.ofport,49 self.switch.br_name))50class TunnelPort(object):51 def __init__(self, port_name, ofport, tunnel_type, local_ip, remote_ip):52 super(TunnelPort, self).__init__()53 self.port_name = port_name54 self.ofport = ofport55 self.tunnel_type = tunnel_type56 self.local_ip = local_ip57 self.remote_ip = remote_ip58 def __eq__(self, other):59 return (self.port_name == other.port_name and60 self.ofport == other.ofport and61 self.tunnel_type == other.tunnel_type and62 self.local_ip == other.local_ip and63 self.remote_ip == other.remote_ip)64 def __str__(self):65 return ('port_name=%s, '66 'ofport=%s, '67 'type=%s, '68 'local_ip=%s, '69 'remote_ip=%s' % (self.port_name,70 self.ofport,71 self.tunnel_type,72 self.local_ip,73 self.remote_ip))74class OVSBridge(object):75 def __init__(self, CONF, datapath_id, ovsdb_addr, timeout=None,76 exception=None):77 super(OVSBridge, self).__init__()78 self.datapath_id = datapath_id79 self.vsctl = ovs_vsctl.VSCtl(ovsdb_addr)80 self.timeout = timeout or CONF.ovsdb_timeout81 self.exception = exception82 self.br_name = None83 def run_command(self, commands):84 self.vsctl.run_command(commands, self.timeout, self.exception)85 def init(self):86 if self.br_name is None:87 self.br_name = self._get_bridge_name()88 def _get_bridge_name(self):89 """ get Bridge name of a given 'datapath_id' """90 command = ovs_vsctl.VSCtlCommand(91 'find',92 ('Bridge',93 'datapath_id=%s' % dpid_lib.dpid_to_str(self.datapath_id)))94 self.run_command([command])95 if not isinstance(command.result, list) or len(command.result) != 1:96 raise OVSBridgeNotFound(97 datapath_id=dpid_lib.dpid_to_str(self.datapath_id))98 return command.result[0].name99 def get_controller(self):100 command = ovs_vsctl.VSCtlCommand('get-controller', [self.br_name])101 self.run_command([command])102 return command.result[0]103 def set_controller(self, controllers):104 command = ovs_vsctl.VSCtlCommand('set-controller', [self.br_name])105 command.args.extend(controllers)106 self.run_command([command])107 def del_controller(self):108 command = ovs_vsctl.VSCtlCommand('del-controller', [self.br_name])109 self.run_command([command])110 def list_db_attributes(self, table, record=None):111 """112 Lists 'record' (or all records) in 'table'.113 This method is corresponding to the following ovs-vsctl command::114 $ ovs-vsctl list TBL [REC]115 """116 command = ovs_vsctl.VSCtlCommand('list', (table, record))117 self.run_command([command])118 if command.result:119 return command.result120 return []121 def find_db_attributes(self, table, *conditions):122 """123 Lists records satisfying 'conditions' in 'table'.124 This method is corresponding to the following ovs-vsctl command::125 $ ovs-vsctl find TBL CONDITION...126 .. Note::127 Currently, only '=' condition is supported.128 To support other condition is TODO.129 """130 args = [table]131 args.extend(conditions)132 command = ovs_vsctl.VSCtlCommand('find', args)133 self.run_command([command])134 if command.result:135 return command.result136 return []137 def get_db_attribute(self, table, record, column, key=None):138 """139 Gets values of 'column' in 'record' in 'table'.140 This method is corresponding to the following ovs-vsctl command::141 $ ovs-vsctl get TBL REC COL[:KEY]142 """143 if key is not None:144 column = '%s:%s' % (column, key)145 command = ovs_vsctl.VSCtlCommand(146 'get', (table, record, column))147 self.run_command([command])148 if command.result:149 return command.result[0]150 return None151 def set_db_attribute(self, table, record, column, value, key=None):152 """153 Sets 'value' into 'column' in 'record' in 'table'.154 This method is corresponding to the following ovs-vsctl command::155 $ ovs-vsctl set TBL REC COL[:KEY]=VALUE156 """157 if key is not None:158 column = '%s:%s' % (column, key)159 command = ovs_vsctl.VSCtlCommand(160 'set', (table, record, '%s=%s' % (column, value)))161 self.run_command([command])162 def add_db_attribute(self, table, record, column, value, key=None):163 """164 Adds ('key'=)'value' into 'column' in 'record' in 'table'.165 This method is corresponding to the following ovs-vsctl command::166 $ ovs-vsctl add TBL REC COL [KEY=]VALUE167 """168 if key is not None:169 value = '%s=%s' % (key, value)170 command = ovs_vsctl.VSCtlCommand(171 'add', (table, record, column, value))172 self.run_command([command])173 def remove_db_attribute(self, table, record, column, value, key=None):174 """175 Removes ('key'=)'value' into 'column' in 'record' in 'table'.176 This method is corresponding to the following ovs-vsctl command::177 $ ovs-vsctl remove TBL REC COL [KEY=]VALUE178 """179 if key is not None:180 value = '%s=%s' % (key, value)181 command = ovs_vsctl.VSCtlCommand(182 'remove', (table, record, column, value))183 self.run_command([command])184 def clear_db_attribute(self, table, record, column):185 """186 Clears values from 'column' in 'record' in 'table'.187 This method is corresponding to the following ovs-vsctl command::188 $ ovs-vsctl clear TBL REC COL189 """190 command = ovs_vsctl.VSCtlCommand('clear', (table, record, column))191 self.run_command([command])192 def db_get_val(self, table, record, column):193 command = ovs_vsctl.VSCtlCommand('get', (table, record, column))194 self.run_command([command])195 assert len(command.result) == 1196 return command.result[0]197 def db_get_map(self, table, record, column):198 val = self.db_get_val(table, record, column)199 assert isinstance(val, dict)200 return val201 def get_datapath_id(self):202 return self.db_get_val('Bridge', self.br_name, 'datapath_id')203 def delete_port(self, port_name):204 command = ovs_vsctl.VSCtlCommand(205 'del-port', (self.br_name, port_name), '--if-exists')206 self.run_command([command])207 def get_ofport(self, port_name):208 ofport_list = self.db_get_val('Interface', port_name, 'ofport')209 assert len(ofport_list) == 1210 return int(ofport_list[0])211 def get_port_name_list(self):212 command = ovs_vsctl.VSCtlCommand('list-ports', (self.br_name, ))213 self.run_command([command])214 return command.result215 def add_bond(self, name, ifaces, bond_mode=None, lacp=None):216 """217 Creates a bonded port.218 :param name: Port name to be created219 :param ifaces: List of interfaces containing at least 2 interfaces220 :param bond_mode: Bonding mode (active-backup, balance-tcp221 or balance-slb)222 :param lacp: LACP mode (active, passive or off)223 """224 assert len(ifaces) >= 2225 options = ''226 if bond_mode:227 options += 'bond_mode=%(bond_mode)s' % locals()228 if lacp:229 options += 'lacp=%(lacp)s' % locals()230 command_add = ovs_vsctl.VSCtlCommand(231 'add-bond', (self.br_name, name, ifaces), options)232 self.run_command([command_add])233 def add_tunnel_port(self, name, tunnel_type, remote_ip,234 local_ip=None, key=None, ofport=None):235 options = 'remote_ip=%(remote_ip)s' % locals()236 if key:237 options += ',key=%(key)s' % locals()238 if local_ip:239 options += ',local_ip=%(local_ip)s' % locals()240 args = ['Interface', name, 'type=%s' % tunnel_type,241 'options:%s' % options]242 if ofport:243 args.append('ofport_request=%(ofport)s' % locals())244 command_add = ovs_vsctl.VSCtlCommand('add-port', (self.br_name, name))245 command_set = ovs_vsctl.VSCtlCommand('set', args)246 self.run_command([command_add, command_set])247 def add_gre_port(self, name, remote_ip,248 local_ip=None, key=None, ofport=None):249 self.add_tunnel_port(name, 'gre', remote_ip,250 local_ip=local_ip, key=key, ofport=ofport)251 def add_vxlan_port(self, name, remote_ip,252 local_ip=None, key=None, ofport=None):253 self.add_tunnel_port(name, 'vxlan', remote_ip,254 local_ip=local_ip, key=key, ofport=ofport)255 def del_port(self, port_name):256 command = ovs_vsctl.VSCtlCommand('del-port', (self.br_name, port_name))257 self.run_command([command])258 def _get_ports(self, get_port):259 ports = []260 port_names = self.get_port_name_list()261 for name in port_names:262 if self.get_ofport(name) < 0:263 continue264 port = get_port(name)265 if port:266 ports.append(port)267 return ports268 def _vifport(self, name, external_ids):269 ofport = self.get_ofport(name)270 return VifPort(name, ofport, external_ids['iface-id'],271 external_ids['attached-mac'], self)272 def _get_vif_port(self, name):273 external_ids = self.db_get_map('Interface', name, 'external_ids')274 if 'iface-id' in external_ids and 'attached-mac' in external_ids:275 return self._vifport(name, external_ids)276 def get_vif_ports(self):277 """ Returns a VIF object for each VIF port """278 return self._get_ports(self._get_vif_port)279 def _get_external_port(self, name):280 # exclude vif ports281 external_ids = self.db_get_map('Interface', name, 'external_ids')282 if external_ids:283 return284 # exclude tunnel ports285 options = self.db_get_map('Interface', name, 'options')286 if 'remote_ip' in options:287 return288 ofport = self.get_ofport(name)289 return VifPort(name, ofport, None, None, self)290 def get_external_ports(self):291 return self._get_ports(self._get_external_port)292 def get_tunnel_port(self, name, tunnel_type='gre'):293 type_ = self.db_get_val('Interface', name, 'type')294 if type_ != tunnel_type:295 return296 options = self.db_get_map('Interface', name, 'options')297 if 'local_ip' in options and 'remote_ip' in options:298 ofport = self.get_ofport(name)299 return TunnelPort(name, ofport, tunnel_type,300 options['local_ip'], options['remote_ip'])301 def get_tunnel_ports(self, tunnel_type='gre'):302 get_tunnel_port = functools.partial(self.get_tunnel_port,303 tunnel_type=tunnel_type)304 return self._get_ports(get_tunnel_port)305 def get_quantum_ports(self, port_name):306 LOG.debug('port_name %s', port_name)307 command = ovs_vsctl.VSCtlCommand(308 'list-ifaces-verbose',309 [dpid_lib.dpid_to_str(self.datapath_id), port_name])310 self.run_command([command])311 if command.result:312 return command.result[0]313 return None314 def set_qos(self, port_name, type='linux-htb', max_rate=None, queues=None):315 queues = queues if queues else []316 command_qos = ovs_vsctl.VSCtlCommand(317 'set-qos',318 [port_name, type, max_rate])319 command_queue = ovs_vsctl.VSCtlCommand(320 'set-queue',321 [port_name, queues])322 self.run_command([command_qos, command_queue])323 if command_qos.result and command_queue.result:324 return command_qos.result + command_queue.result325 return None326 def del_qos(self, port_name):327 command = ovs_vsctl.VSCtlCommand(328 'del-qos',329 [port_name])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful