How to use _send_msg method in autotest

Best Python code snippet using autotest_python

test_br_tun.py

Source:test_br_tun.py Github

copy

Full Screen

...32 self.br.setup_default_table(patch_int_ofport=patch_int_ofport,33 arp_responder_enabled=arp_responder_enabled)34 (dp, ofp, ofpp) = self._get_dp()35 expected = [36 call._send_msg(ofpp.OFPFlowMod(dp,37 cookie=0,38 instructions=[ofpp.OFPInstructionGotoTable(table_id=2)],39 match=ofpp.OFPMatch(in_port=patch_int_ofport),40 priority=1, table_id=0)),41 call._send_msg(ofpp.OFPFlowMod(dp,42 cookie=0,43 instructions=[],44 match=ofpp.OFPMatch(),45 priority=0, table_id=0)),46 call._send_msg(ofpp.OFPFlowMod(dp,47 cookie=0,48 instructions=[ofpp.OFPInstructionGotoTable(table_id=20)],49 match=ofpp.OFPMatch(50 eth_dst=('00:00:00:00:00:00', '01:00:00:00:00:00')),51 priority=0,52 table_id=2)),53 call._send_msg(ofpp.OFPFlowMod(dp,54 cookie=0,55 instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],56 match=ofpp.OFPMatch(57 eth_dst=('01:00:00:00:00:00', '01:00:00:00:00:00')),58 priority=0,59 table_id=2)),60 call._send_msg(ofpp.OFPFlowMod(dp,61 cookie=0,62 instructions=[],63 match=ofpp.OFPMatch(),64 priority=0, table_id=3)),65 call._send_msg(ofpp.OFPFlowMod(dp,66 cookie=0,67 instructions=[],68 match=ofpp.OFPMatch(),69 priority=0, table_id=4)),70 call._send_msg(ofpp.OFPFlowMod(dp,71 cookie=0,72 instructions=[],73 match=ofpp.OFPMatch(),74 priority=0, table_id=6)),75 call._send_msg(ofpp.OFPFlowMod(dp,76 cookie=0,77 instructions=[78 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [79 ofpp.NXActionLearn(80 cookie=0,81 hard_timeout=300,82 priority=1,83 specs=[84 ofpp.NXFlowSpecMatch(85 dst=('vlan_vid', 0),86 n_bits=12,87 src=('vlan_vid', 0)),88 ofpp.NXFlowSpecMatch(89 dst=('eth_dst', 0),90 n_bits=48,91 src=('eth_src', 0)),92 ofpp.NXFlowSpecLoad(93 dst=('vlan_vid', 0),94 n_bits=12,95 src=0),96 ofpp.NXFlowSpecLoad(97 dst=('tunnel_id', 0),98 n_bits=64,99 src=('tunnel_id', 0)),100 ofpp.NXFlowSpecOutput(101 dst='',102 n_bits=32,103 src=('in_port', 0)),104 ],105 table_id=20),106 ofpp.OFPActionOutput(patch_int_ofport, 0),107 ]),108 ],109 match=ofpp.OFPMatch(),110 priority=1,111 table_id=10)),112 call._send_msg(ofpp.OFPFlowMod(dp,113 cookie=0,114 instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],115 match=ofpp.OFPMatch(),116 priority=0,117 table_id=20)),118 call._send_msg(ofpp.OFPFlowMod(dp,119 cookie=0,120 instructions=[],121 match=ofpp.OFPMatch(),122 priority=0,123 table_id=22))124 ]125 self.assertEqual(expected, self.mock.mock_calls)126 def test_setup_default_table_arp_responder_enabled(self):127 patch_int_ofport = 5555128 arp_responder_enabled = True129 self.br.setup_default_table(patch_int_ofport=patch_int_ofport,130 arp_responder_enabled=arp_responder_enabled)131 (dp, ofp, ofpp) = self._get_dp()132 expected = [133 call._send_msg(ofpp.OFPFlowMod(dp,134 cookie=0,135 instructions=[ofpp.OFPInstructionGotoTable(table_id=2)],136 match=ofpp.OFPMatch(in_port=patch_int_ofport),137 priority=1, table_id=0)),138 call._send_msg(ofpp.OFPFlowMod(dp,139 cookie=0,140 instructions=[],141 match=ofpp.OFPMatch(),142 priority=0, table_id=0)),143 call._send_msg(ofpp.OFPFlowMod(dp,144 cookie=0,145 instructions=[ofpp.OFPInstructionGotoTable(table_id=21)],146 match=ofpp.OFPMatch(147 eth_dst='ff:ff:ff:ff:ff:ff',148 eth_type=self.ether_types.ETH_TYPE_ARP),149 priority=1,150 table_id=2)),151 call._send_msg(ofpp.OFPFlowMod(dp,152 cookie=0,153 instructions=[ofpp.OFPInstructionGotoTable(table_id=20)],154 match=ofpp.OFPMatch(155 eth_dst=('00:00:00:00:00:00', '01:00:00:00:00:00')),156 priority=0,157 table_id=2)),158 call._send_msg(ofpp.OFPFlowMod(dp,159 cookie=0,160 instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],161 match=ofpp.OFPMatch(162 eth_dst=('01:00:00:00:00:00', '01:00:00:00:00:00')),163 priority=0,164 table_id=2)),165 call._send_msg(ofpp.OFPFlowMod(dp,166 cookie=0,167 instructions=[],168 match=ofpp.OFPMatch(),169 priority=0, table_id=3)),170 call._send_msg(ofpp.OFPFlowMod(dp,171 cookie=0,172 instructions=[],173 match=ofpp.OFPMatch(),174 priority=0, table_id=4)),175 call._send_msg(ofpp.OFPFlowMod(dp,176 cookie=0,177 instructions=[],178 match=ofpp.OFPMatch(),179 priority=0, table_id=6)),180 call._send_msg(ofpp.OFPFlowMod(dp,181 cookie=0,182 instructions=[183 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [184 ofpp.NXActionLearn(185 cookie=0,186 hard_timeout=300,187 priority=1,188 specs=[189 ofpp.NXFlowSpecMatch(190 dst=('vlan_vid', 0),191 n_bits=12,192 src=('vlan_vid', 0)),193 ofpp.NXFlowSpecMatch(194 dst=('eth_dst', 0),195 n_bits=48,196 src=('eth_src', 0)),197 ofpp.NXFlowSpecLoad(198 dst=('vlan_vid', 0),199 n_bits=12,200 src=0),201 ofpp.NXFlowSpecLoad(202 dst=('tunnel_id', 0),203 n_bits=64,204 src=('tunnel_id', 0)),205 ofpp.NXFlowSpecOutput(206 dst='',207 n_bits=32,208 src=('in_port', 0)),209 ],210 table_id=20),211 ofpp.OFPActionOutput(patch_int_ofport, 0),212 ]),213 ],214 match=ofpp.OFPMatch(),215 priority=1,216 table_id=10)),217 call._send_msg(ofpp.OFPFlowMod(dp,218 cookie=0,219 instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],220 match=ofpp.OFPMatch(),221 priority=0,222 table_id=20)),223 call._send_msg(ofpp.OFPFlowMod(dp,224 cookie=0,225 instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],226 match=ofpp.OFPMatch(),227 priority=0,228 table_id=21)),229 call._send_msg(ofpp.OFPFlowMod(dp,230 cookie=0,231 instructions=[],232 match=ofpp.OFPMatch(),233 priority=0,234 table_id=22))235 ]236 self.assertEqual(expected, self.mock.mock_calls)237 def test_provision_local_vlan(self):238 network_type = 'vxlan'239 lvid = 888240 segmentation_id = 777241 distributed = False242 self.br.provision_local_vlan(network_type=network_type, lvid=lvid,243 segmentation_id=segmentation_id,244 distributed=distributed)245 (dp, ofp, ofpp) = self._get_dp()246 expected = [247 call._send_msg(ofpp.OFPFlowMod(dp,248 cookie=0,249 instructions=[250 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [251 ofpp.OFPActionPushVlan(),252 ofpp.OFPActionSetField(253 vlan_vid=lvid | ofp.OFPVID_PRESENT)254 ]),255 ofpp.OFPInstructionGotoTable(table_id=10),256 ],257 match=ofpp.OFPMatch(tunnel_id=segmentation_id),258 priority=1,259 table_id=4)),260 ]261 self.assertEqual(expected, self.mock.mock_calls)262 def test_reclaim_local_vlan(self):263 network_type = 'vxlan'264 segmentation_id = 777265 self.br.reclaim_local_vlan(network_type=network_type,266 segmentation_id=segmentation_id)267 (dp, ofp, ofpp) = self._get_dp()268 expected = [269 call.delete_flows(270 table_id=4,271 match=ofpp.OFPMatch(tunnel_id=segmentation_id)),272 ]273 self.assertEqual(expected, self.mock.mock_calls)274 def test_install_flood_to_tun(self):275 vlan = 3333276 tun_id = 2222277 ports = [11, 44, 22, 33]278 self.br.install_flood_to_tun(vlan=vlan,279 tun_id=tun_id,280 ports=ports)281 (dp, ofp, ofpp) = self._get_dp()282 expected = [283 call._send_msg(ofpp.OFPFlowMod(dp,284 cookie=0,285 instructions=[286 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [287 ofpp.OFPActionPopVlan(),288 ofpp.OFPActionSetField(tunnel_id=tun_id),289 ] + [ofpp.OFPActionOutput(p, 0) for p in ports]),290 ],291 match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT),292 priority=1,293 table_id=22)),294 ]295 self.assertEqual(expected, self.mock.mock_calls)296 def test_delete_flood_to_tun(self):297 vlan = 3333298 self.br.delete_flood_to_tun(vlan=vlan)299 (dp, ofp, ofpp) = self._get_dp()300 expected = [301 call.delete_flows(table_id=22,302 match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT)),303 ]304 self.assertEqual(expected, self.mock.mock_calls)305 def test_install_unicast_to_tun(self):306 vlan = 3333307 port = 55308 mac = '08:60:6e:7f:74:e7'309 tun_id = 2222310 self.br.install_unicast_to_tun(vlan=vlan,311 tun_id=tun_id,312 port=port,313 mac=mac)314 (dp, ofp, ofpp) = self._get_dp()315 expected = [316 call._send_msg(ofpp.OFPFlowMod(dp,317 cookie=0,318 instructions=[319 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [320 ofpp.OFPActionPopVlan(),321 ofpp.OFPActionSetField(tunnel_id=tun_id),322 ofpp.OFPActionOutput(port, 0),323 ]),324 ],325 match=ofpp.OFPMatch(326 eth_dst=mac, vlan_vid=vlan | ofp.OFPVID_PRESENT),327 priority=2,328 table_id=20)),329 ]330 self.assertEqual(expected, self.mock.mock_calls)331 def test_delete_unicast_to_tun(self):332 vlan = 3333333 mac = '08:60:6e:7f:74:e7'334 self.br.delete_unicast_to_tun(vlan=vlan, mac=mac)335 (dp, ofp, ofpp) = self._get_dp()336 expected = [337 call.delete_flows(table_id=20,338 match=ofpp.OFPMatch(339 eth_dst=mac, vlan_vid=vlan | ofp.OFPVID_PRESENT)),340 ]341 self.assertEqual(expected, self.mock.mock_calls)342 def test_delete_unicast_to_tun_without_mac(self):343 vlan = 3333344 mac = None345 self.br.delete_unicast_to_tun(vlan=vlan, mac=mac)346 (dp, ofp, ofpp) = self._get_dp()347 expected = [348 call.delete_flows(table_id=20,349 match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT)),350 ]351 self.assertEqual(expected, self.mock.mock_calls)352 def test_install_arp_responder(self):353 vlan = 3333354 ip = '192.0.2.1'355 mac = '08:60:6e:7f:74:e7'356 self.br.install_arp_responder(vlan=vlan, ip=ip, mac=mac)357 (dp, ofp, ofpp) = self._get_dp()358 expected = [359 call._send_msg(ofpp.OFPFlowMod(dp,360 cookie=0,361 instructions=[362 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [363 ofpp.OFPActionSetField(arp_op=self.arp.ARP_REPLY),364 ofpp.NXActionRegMove(365 dst_field='arp_tha',366 n_bits=48,367 src_field='arp_sha'),368 ofpp.NXActionRegMove(369 dst_field='arp_tpa',370 n_bits=32,371 src_field='arp_spa'),372 ofpp.OFPActionSetField(arp_sha=mac),373 ofpp.OFPActionSetField(arp_spa=ip),374 ofpp.OFPActionOutput(ofp.OFPP_IN_PORT, 0),375 ]),376 ],377 match=ofpp.OFPMatch(378 eth_type=self.ether_types.ETH_TYPE_ARP,379 arp_tpa=ip,380 vlan_vid=vlan | ofp.OFPVID_PRESENT),381 priority=1,382 table_id=21)),383 ]384 self.assertEqual(expected, self.mock.mock_calls)385 def test_delete_arp_responder(self):386 vlan = 3333387 ip = '192.0.2.1'388 self.br.delete_arp_responder(vlan=vlan, ip=ip)389 (dp, ofp, ofpp) = self._get_dp()390 expected = [391 call.delete_flows(392 match=ofpp.OFPMatch(393 eth_type=self.ether_types.ETH_TYPE_ARP,394 arp_tpa=ip,395 vlan_vid=vlan | ofp.OFPVID_PRESENT),396 table_id=21),397 ]398 self.assertEqual(expected, self.mock.mock_calls)399 def test_delete_arp_responder_without_ip(self):400 vlan = 3333401 ip = None402 self.br.delete_arp_responder(vlan=vlan, ip=ip)403 (dp, ofp, ofpp) = self._get_dp()404 expected = [405 call.delete_flows(406 match=ofpp.OFPMatch(407 eth_type=self.ether_types.ETH_TYPE_ARP,408 vlan_vid=vlan | ofp.OFPVID_PRESENT),409 table_id=21),410 ]411 self.assertEqual(expected, self.mock.mock_calls)412 def test_setup_tunnel_port(self):413 network_type = 'vxlan'414 port = 11111415 self.br.setup_tunnel_port(network_type=network_type, port=port)416 (dp, ofp, ofpp) = self._get_dp()417 expected = [418 call._send_msg(ofpp.OFPFlowMod(dp,419 cookie=0,420 instructions=[421 ofpp.OFPInstructionGotoTable(table_id=4),422 ],423 match=ofpp.OFPMatch(in_port=port),424 priority=1,425 table_id=0)),426 ]427 self.assertEqual(expected, self.mock.mock_calls)428 def test_cleanup_tunnel_port(self):429 port = 11111430 self.br.cleanup_tunnel_port(port=port)431 (dp, ofp, ofpp) = self._get_dp()432 expected = [433 call.delete_flows(in_port=port),434 ]435 self.assertEqual(expected, self.mock.mock_calls)436 def test_add_dvr_mac_tun(self):437 mac = '00:02:b3:13:fe:3d'438 port = 8888439 self.br.add_dvr_mac_tun(mac=mac, port=port)440 (dp, ofp, ofpp) = self._get_dp()441 expected = [442 call._send_msg(ofpp.OFPFlowMod(dp,443 cookie=0,444 instructions=[445 ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [446 ofpp.OFPActionOutput(port, 0),447 ]),448 ],449 match=ofpp.OFPMatch(eth_src=mac),450 priority=1,451 table_id=9)),452 ]453 self.assertEqual(expected, self.mock.mock_calls)454 def test_remove_dvr_mac_tun(self):455 mac = '00:02:b3:13:fe:3d'456 self.br.remove_dvr_mac_tun(mac=mac)...

Full Screen

Full Screen

test_remote.py

Source:test_remote.py Github

copy

Full Screen

1import pytest2from geonotebook.kernel import Remote3@pytest.fixture4def remote(mocker):5 protocols = [{'procedure': 'no_args',6 'required': [],7 'optional': []},8 {'procedure': 'required_only',9 'required': [{"key": "a"}, {"key": "b"}],10 'optional': []},11 {'procedure': 'optional_only',12 'required': [],13 'optional': [{"key": "x"}, {"key": "y"}, {"key": "z"}]},14 {'procedure': 'required_and_optional',15 'required': [{"key": "a"}, {"key": "b"}],16 'optional': [{"key": "x"}, {"key": "y"}, {"key": "z"}]}]17 r = Remote(None, protocols)18 # Mock out the UUID.uuid4 function to return a consistent ID for testing19 mocker.patch('geonotebook.jsonrpc.uuid.uuid4', return_value='TEST-ID')20 mocker.patch.object(r, '_send_msg')21 return r22def test_remote_bad_protocol():23 with pytest.raises(AssertionError):24 Remote(None, ['foo', 'bar'])25def test_remote_bad_protocol_missing_procedure():26 with pytest.raises(AssertionError):27 Remote(None, [{'required': [],28 'optional': []}])29def test_remote_bad_protocol_missing_required():30 with pytest.raises(AssertionError):31 Remote(None, [{'procedure': 'no_args',32 'optional': []}])33def test_remote_bad_protocol_missing_optional():34 with pytest.raises(AssertionError):35 Remote(None, [{'procedure': 'no_args',36 'required': []}])37def test_remote_init(remote):38 assert hasattr(remote.no_args, '__call__')39 assert hasattr(remote.required_only, '__call__')40 assert hasattr(remote.required_and_optional, '__call__')41def test_remote_call_no_args(remote):42 remote.no_args()43 assert remote._send_msg.call_count == 144 remote._send_msg.assert_called_with({'jsonrpc': '2.0', 'params': [],45 'method': 'no_args', 'id': 'TEST-ID'})46def test_remote_call_no_args_with_args(remote):47 with pytest.raises(AssertionError):48 remote.no_args('foo', 'bar')49def test_remote_call_required_only(remote):50 remote.required_only('foo', 'bar')51 assert remote._send_msg.call_count == 152 remote._send_msg.assert_called_with({53 'jsonrpc': '2.0', 'params': [{'key': 'a',54 'value': 'foo',55 'required': True},56 {'key': 'b',57 'value': 'bar',58 'required': True}],59 'method': 'required_only', 'id': 'TEST-ID'})60def test_remote_call_required_only_with_too_few_args(remote):61 with pytest.raises(AssertionError):62 remote.required_only('foo')63def test_remote_call_required_only_with_too_many_args(remote):64 with pytest.raises(AssertionError):65 remote.no_args('foo', 'bar', 'baz')66def test_remote_call_optional_only(remote):67 remote.optional_only(x='foo', y='bar', z='baz')68 assert remote._send_msg.call_count == 169 remote._send_msg.assert_called_with({70 'jsonrpc': '2.0',71 'params': [{'key': 'x', 'value': 'foo', 'required': False},72 {'key': 'y', 'value': 'bar', 'required': False},73 {'key': 'z', 'value': 'baz', 'required': False}],74 'method': 'optional_only', 'id': 'TEST-ID'})75 remote.optional_only()76 assert remote._send_msg.call_count == 277 remote._send_msg.assert_called_with({78 'jsonrpc': '2.0', 'params': [],79 'method': 'optional_only', 'id': 'TEST-ID'})80def test_remote_call_optional_only_missing_arguments(remote):81 remote.optional_only(x='foo', z='bar')82 assert remote._send_msg.call_count == 183 remote._send_msg.assert_called_with({84 'jsonrpc': '2.0',85 'params': [{'key': 'x', 'value': 'foo', 'required': False},86 {'key': 'z', 'value': 'bar', 'required': False}],87 'method': 'optional_only', 'id': 'TEST-ID'})88def test_remote_promise_resolve_success(remote):89 class Nonlocal(object):90 pass91 def success(val):92 Nonlocal.result = val93 def error(val):94 Nonlocal.result = val95 remote.no_args().then(success, error)96 remote.resolve({'id': 'TEST-ID', 'result': 'SUCCESS', 'error': None})97 assert Nonlocal.result == 'SUCCESS'98def test_remote_promise_resolve_error(remote):99 class Nonlocal(object):100 pass101 def success(val):102 Nonlocal.result = val103 def error(val):104 Nonlocal.result = val105 remote.no_args().then(success, error)106 remote.resolve({'id': 'TEST-ID', 'result': None, 'error': 'ERROR'})107 assert isinstance(Nonlocal.result, Exception)108 assert str(Nonlocal.result) == "ERROR"109@pytest.mark.skip(reason="See: geonotebook/issues/46")110def test_remote_promise_resolve_with_bad_message(r, mocker):111 class Nonlocal(object):112 pass113 def success(val):114 Nonlocal.result = val115 def error(val):116 Nonlocal.result = val117 remote.no_args().then(success, error)118 with pytest.raises(Exception):119 remote.resolve('bad message')120 remote.no_args().then(success, error)121 with pytest.raises(Exception):122 remote.resolve({'id': 'TEST-ID', 'bad': 'message'})123 remote.no_args().then(success, error)124 # warn = mockeremote.patch.object(remote.log, 'warn')125 remote.resolve({'id': 'BAD-ID'})...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful