Best Python code snippet using autotest_python
test_br_tun.py
Source:test_br_tun.py  
...32        self.br.setup_default_table(patch_int_ofport=patch_int_ofport,33            arp_responder_enabled=arp_responder_enabled)34        (dp, ofp, ofpp) = self._get_dp()35        expected = [36            call._send_msg(ofpp.OFPFlowMod(dp,37                cookie=0,38                instructions=[ofpp.OFPInstructionGotoTable(table_id=2)],39                match=ofpp.OFPMatch(in_port=patch_int_ofport),40                priority=1, table_id=0)),41            call._send_msg(ofpp.OFPFlowMod(dp,42                cookie=0,43                instructions=[],44                match=ofpp.OFPMatch(),45                priority=0, table_id=0)),46            call._send_msg(ofpp.OFPFlowMod(dp,47                cookie=0,48                instructions=[ofpp.OFPInstructionGotoTable(table_id=20)],49                match=ofpp.OFPMatch(50                    eth_dst=('00:00:00:00:00:00', '01:00:00:00:00:00')),51                priority=0,52                table_id=2)),53            call._send_msg(ofpp.OFPFlowMod(dp,54                cookie=0,55                instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],56                match=ofpp.OFPMatch(57                    eth_dst=('01:00:00:00:00:00', '01:00:00:00:00:00')),58                priority=0,59                table_id=2)),60            call._send_msg(ofpp.OFPFlowMod(dp,61                cookie=0,62                instructions=[],63                match=ofpp.OFPMatch(),64                priority=0, table_id=3)),65            call._send_msg(ofpp.OFPFlowMod(dp,66                cookie=0,67                instructions=[],68                match=ofpp.OFPMatch(),69                priority=0, table_id=4)),70            call._send_msg(ofpp.OFPFlowMod(dp,71                cookie=0,72                instructions=[],73                match=ofpp.OFPMatch(),74                priority=0, table_id=6)),75            call._send_msg(ofpp.OFPFlowMod(dp,76                cookie=0,77                instructions=[78                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [79                        ofpp.NXActionLearn(80                            cookie=0,81                            hard_timeout=300,82                            priority=1,83                            specs=[84                                ofpp.NXFlowSpecMatch(85                                    dst=('vlan_vid', 0),86                                    n_bits=12,87                                    src=('vlan_vid', 0)),88                                ofpp.NXFlowSpecMatch(89                                    dst=('eth_dst', 0),90                                    n_bits=48,91                                    src=('eth_src', 0)),92                                ofpp.NXFlowSpecLoad(93                                    dst=('vlan_vid', 0),94                                    n_bits=12,95                                    src=0),96                                ofpp.NXFlowSpecLoad(97                                    dst=('tunnel_id', 0),98                                    n_bits=64,99                                    src=('tunnel_id', 0)),100                                ofpp.NXFlowSpecOutput(101                                    dst='',102                                    n_bits=32,103                                    src=('in_port', 0)),104                            ],105                            table_id=20),106                        ofpp.OFPActionOutput(patch_int_ofport, 0),107                    ]),108                ],109                match=ofpp.OFPMatch(),110                priority=1,111                table_id=10)),112            call._send_msg(ofpp.OFPFlowMod(dp,113                cookie=0,114                instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],115                match=ofpp.OFPMatch(),116                priority=0,117                table_id=20)),118            call._send_msg(ofpp.OFPFlowMod(dp,119                cookie=0,120                instructions=[],121                match=ofpp.OFPMatch(),122                priority=0,123                table_id=22))124        ]125        self.assertEqual(expected, self.mock.mock_calls)126    def test_setup_default_table_arp_responder_enabled(self):127        patch_int_ofport = 5555128        arp_responder_enabled = True129        self.br.setup_default_table(patch_int_ofport=patch_int_ofport,130            arp_responder_enabled=arp_responder_enabled)131        (dp, ofp, ofpp) = self._get_dp()132        expected = [133            call._send_msg(ofpp.OFPFlowMod(dp,134                cookie=0,135                instructions=[ofpp.OFPInstructionGotoTable(table_id=2)],136                match=ofpp.OFPMatch(in_port=patch_int_ofport),137                priority=1, table_id=0)),138            call._send_msg(ofpp.OFPFlowMod(dp,139                cookie=0,140                instructions=[],141                match=ofpp.OFPMatch(),142                priority=0, table_id=0)),143            call._send_msg(ofpp.OFPFlowMod(dp,144                cookie=0,145                instructions=[ofpp.OFPInstructionGotoTable(table_id=21)],146                match=ofpp.OFPMatch(147                    eth_dst='ff:ff:ff:ff:ff:ff',148                    eth_type=self.ether_types.ETH_TYPE_ARP),149                priority=1,150                table_id=2)),151            call._send_msg(ofpp.OFPFlowMod(dp,152                cookie=0,153                instructions=[ofpp.OFPInstructionGotoTable(table_id=20)],154                match=ofpp.OFPMatch(155                    eth_dst=('00:00:00:00:00:00', '01:00:00:00:00:00')),156                priority=0,157                table_id=2)),158            call._send_msg(ofpp.OFPFlowMod(dp,159                cookie=0,160                instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],161                match=ofpp.OFPMatch(162                    eth_dst=('01:00:00:00:00:00', '01:00:00:00:00:00')),163                priority=0,164                table_id=2)),165            call._send_msg(ofpp.OFPFlowMod(dp,166                cookie=0,167                instructions=[],168                match=ofpp.OFPMatch(),169                priority=0, table_id=3)),170            call._send_msg(ofpp.OFPFlowMod(dp,171                cookie=0,172                instructions=[],173                match=ofpp.OFPMatch(),174                priority=0, table_id=4)),175            call._send_msg(ofpp.OFPFlowMod(dp,176                cookie=0,177                instructions=[],178                match=ofpp.OFPMatch(),179                priority=0, table_id=6)),180            call._send_msg(ofpp.OFPFlowMod(dp,181                cookie=0,182                instructions=[183                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [184                        ofpp.NXActionLearn(185                            cookie=0,186                            hard_timeout=300,187                            priority=1,188                            specs=[189                                ofpp.NXFlowSpecMatch(190                                    dst=('vlan_vid', 0),191                                    n_bits=12,192                                    src=('vlan_vid', 0)),193                                ofpp.NXFlowSpecMatch(194                                    dst=('eth_dst', 0),195                                    n_bits=48,196                                    src=('eth_src', 0)),197                                ofpp.NXFlowSpecLoad(198                                    dst=('vlan_vid', 0),199                                    n_bits=12,200                                    src=0),201                                ofpp.NXFlowSpecLoad(202                                    dst=('tunnel_id', 0),203                                    n_bits=64,204                                    src=('tunnel_id', 0)),205                                ofpp.NXFlowSpecOutput(206                                    dst='',207                                    n_bits=32,208                                    src=('in_port', 0)),209                            ],210                            table_id=20),211                        ofpp.OFPActionOutput(patch_int_ofport, 0),212                    ]),213                ],214                match=ofpp.OFPMatch(),215                priority=1,216                table_id=10)),217            call._send_msg(ofpp.OFPFlowMod(dp,218                cookie=0,219                instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],220                match=ofpp.OFPMatch(),221                priority=0,222                table_id=20)),223            call._send_msg(ofpp.OFPFlowMod(dp,224                cookie=0,225                instructions=[ofpp.OFPInstructionGotoTable(table_id=22)],226                match=ofpp.OFPMatch(),227                priority=0,228                table_id=21)),229            call._send_msg(ofpp.OFPFlowMod(dp,230                cookie=0,231                instructions=[],232                match=ofpp.OFPMatch(),233                priority=0,234                table_id=22))235        ]236        self.assertEqual(expected, self.mock.mock_calls)237    def test_provision_local_vlan(self):238        network_type = 'vxlan'239        lvid = 888240        segmentation_id = 777241        distributed = False242        self.br.provision_local_vlan(network_type=network_type, lvid=lvid,243                                     segmentation_id=segmentation_id,244                                     distributed=distributed)245        (dp, ofp, ofpp) = self._get_dp()246        expected = [247            call._send_msg(ofpp.OFPFlowMod(dp,248                cookie=0,249                instructions=[250                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [251                        ofpp.OFPActionPushVlan(),252                        ofpp.OFPActionSetField(253                            vlan_vid=lvid | ofp.OFPVID_PRESENT)254                    ]),255                    ofpp.OFPInstructionGotoTable(table_id=10),256                ],257                match=ofpp.OFPMatch(tunnel_id=segmentation_id),258                priority=1,259                table_id=4)),260        ]261        self.assertEqual(expected, self.mock.mock_calls)262    def test_reclaim_local_vlan(self):263        network_type = 'vxlan'264        segmentation_id = 777265        self.br.reclaim_local_vlan(network_type=network_type,266                                   segmentation_id=segmentation_id)267        (dp, ofp, ofpp) = self._get_dp()268        expected = [269            call.delete_flows(270                table_id=4,271                match=ofpp.OFPMatch(tunnel_id=segmentation_id)),272        ]273        self.assertEqual(expected, self.mock.mock_calls)274    def test_install_flood_to_tun(self):275        vlan = 3333276        tun_id = 2222277        ports = [11, 44, 22, 33]278        self.br.install_flood_to_tun(vlan=vlan,279                                     tun_id=tun_id,280                                     ports=ports)281        (dp, ofp, ofpp) = self._get_dp()282        expected = [283            call._send_msg(ofpp.OFPFlowMod(dp,284                cookie=0,285                instructions=[286                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [287                        ofpp.OFPActionPopVlan(),288                        ofpp.OFPActionSetField(tunnel_id=tun_id),289                    ] + [ofpp.OFPActionOutput(p, 0) for p in ports]),290                ],291                match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT),292                priority=1,293                table_id=22)),294        ]295        self.assertEqual(expected, self.mock.mock_calls)296    def test_delete_flood_to_tun(self):297        vlan = 3333298        self.br.delete_flood_to_tun(vlan=vlan)299        (dp, ofp, ofpp) = self._get_dp()300        expected = [301            call.delete_flows(table_id=22,302                match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT)),303        ]304        self.assertEqual(expected, self.mock.mock_calls)305    def test_install_unicast_to_tun(self):306        vlan = 3333307        port = 55308        mac = '08:60:6e:7f:74:e7'309        tun_id = 2222310        self.br.install_unicast_to_tun(vlan=vlan,311                                       tun_id=tun_id,312                                       port=port,313                                       mac=mac)314        (dp, ofp, ofpp) = self._get_dp()315        expected = [316            call._send_msg(ofpp.OFPFlowMod(dp,317                cookie=0,318                instructions=[319                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [320                        ofpp.OFPActionPopVlan(),321                        ofpp.OFPActionSetField(tunnel_id=tun_id),322                        ofpp.OFPActionOutput(port, 0),323                    ]),324                ],325                match=ofpp.OFPMatch(326                    eth_dst=mac, vlan_vid=vlan | ofp.OFPVID_PRESENT),327                priority=2,328                table_id=20)),329        ]330        self.assertEqual(expected, self.mock.mock_calls)331    def test_delete_unicast_to_tun(self):332        vlan = 3333333        mac = '08:60:6e:7f:74:e7'334        self.br.delete_unicast_to_tun(vlan=vlan, mac=mac)335        (dp, ofp, ofpp) = self._get_dp()336        expected = [337            call.delete_flows(table_id=20,338                match=ofpp.OFPMatch(339                    eth_dst=mac, vlan_vid=vlan | ofp.OFPVID_PRESENT)),340        ]341        self.assertEqual(expected, self.mock.mock_calls)342    def test_delete_unicast_to_tun_without_mac(self):343        vlan = 3333344        mac = None345        self.br.delete_unicast_to_tun(vlan=vlan, mac=mac)346        (dp, ofp, ofpp) = self._get_dp()347        expected = [348            call.delete_flows(table_id=20,349                match=ofpp.OFPMatch(vlan_vid=vlan | ofp.OFPVID_PRESENT)),350        ]351        self.assertEqual(expected, self.mock.mock_calls)352    def test_install_arp_responder(self):353        vlan = 3333354        ip = '192.0.2.1'355        mac = '08:60:6e:7f:74:e7'356        self.br.install_arp_responder(vlan=vlan, ip=ip, mac=mac)357        (dp, ofp, ofpp) = self._get_dp()358        expected = [359            call._send_msg(ofpp.OFPFlowMod(dp,360                cookie=0,361                instructions=[362                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [363                        ofpp.OFPActionSetField(arp_op=self.arp.ARP_REPLY),364                        ofpp.NXActionRegMove(365                            dst_field='arp_tha',366                            n_bits=48,367                            src_field='arp_sha'),368                        ofpp.NXActionRegMove(369                            dst_field='arp_tpa',370                            n_bits=32,371                            src_field='arp_spa'),372                        ofpp.OFPActionSetField(arp_sha=mac),373                        ofpp.OFPActionSetField(arp_spa=ip),374                        ofpp.OFPActionOutput(ofp.OFPP_IN_PORT, 0),375                    ]),376                ],377                match=ofpp.OFPMatch(378                    eth_type=self.ether_types.ETH_TYPE_ARP,379                    arp_tpa=ip,380                    vlan_vid=vlan | ofp.OFPVID_PRESENT),381                priority=1,382                table_id=21)),383        ]384        self.assertEqual(expected, self.mock.mock_calls)385    def test_delete_arp_responder(self):386        vlan = 3333387        ip = '192.0.2.1'388        self.br.delete_arp_responder(vlan=vlan, ip=ip)389        (dp, ofp, ofpp) = self._get_dp()390        expected = [391            call.delete_flows(392                match=ofpp.OFPMatch(393                    eth_type=self.ether_types.ETH_TYPE_ARP,394                    arp_tpa=ip,395                    vlan_vid=vlan | ofp.OFPVID_PRESENT),396                table_id=21),397        ]398        self.assertEqual(expected, self.mock.mock_calls)399    def test_delete_arp_responder_without_ip(self):400        vlan = 3333401        ip = None402        self.br.delete_arp_responder(vlan=vlan, ip=ip)403        (dp, ofp, ofpp) = self._get_dp()404        expected = [405            call.delete_flows(406                match=ofpp.OFPMatch(407                    eth_type=self.ether_types.ETH_TYPE_ARP,408                    vlan_vid=vlan | ofp.OFPVID_PRESENT),409                table_id=21),410        ]411        self.assertEqual(expected, self.mock.mock_calls)412    def test_setup_tunnel_port(self):413        network_type = 'vxlan'414        port = 11111415        self.br.setup_tunnel_port(network_type=network_type, port=port)416        (dp, ofp, ofpp) = self._get_dp()417        expected = [418            call._send_msg(ofpp.OFPFlowMod(dp,419                cookie=0,420                instructions=[421                    ofpp.OFPInstructionGotoTable(table_id=4),422                ],423                match=ofpp.OFPMatch(in_port=port),424                priority=1,425                table_id=0)),426        ]427        self.assertEqual(expected, self.mock.mock_calls)428    def test_cleanup_tunnel_port(self):429        port = 11111430        self.br.cleanup_tunnel_port(port=port)431        (dp, ofp, ofpp) = self._get_dp()432        expected = [433            call.delete_flows(in_port=port),434        ]435        self.assertEqual(expected, self.mock.mock_calls)436    def test_add_dvr_mac_tun(self):437        mac = '00:02:b3:13:fe:3d'438        port = 8888439        self.br.add_dvr_mac_tun(mac=mac, port=port)440        (dp, ofp, ofpp) = self._get_dp()441        expected = [442            call._send_msg(ofpp.OFPFlowMod(dp,443                cookie=0,444                instructions=[445                    ofpp.OFPInstructionActions(ofp.OFPIT_APPLY_ACTIONS, [446                        ofpp.OFPActionOutput(port, 0),447                    ]),448                ],449                match=ofpp.OFPMatch(eth_src=mac),450                priority=1,451                table_id=9)),452        ]453        self.assertEqual(expected, self.mock.mock_calls)454    def test_remove_dvr_mac_tun(self):455        mac = '00:02:b3:13:fe:3d'456        self.br.remove_dvr_mac_tun(mac=mac)...test_remote.py
Source:test_remote.py  
1import pytest2from geonotebook.kernel import Remote3@pytest.fixture4def remote(mocker):5    protocols = [{'procedure': 'no_args',6                  'required': [],7                  'optional': []},8                 {'procedure': 'required_only',9                  'required': [{"key": "a"}, {"key": "b"}],10                  'optional': []},11                 {'procedure': 'optional_only',12                  'required': [],13                  'optional': [{"key": "x"}, {"key": "y"}, {"key": "z"}]},14                 {'procedure': 'required_and_optional',15                  'required': [{"key": "a"}, {"key": "b"}],16                  'optional': [{"key": "x"}, {"key": "y"}, {"key": "z"}]}]17    r = Remote(None, protocols)18    # Mock out the UUID.uuid4 function to return a consistent ID for testing19    mocker.patch('geonotebook.jsonrpc.uuid.uuid4', return_value='TEST-ID')20    mocker.patch.object(r, '_send_msg')21    return r22def test_remote_bad_protocol():23    with pytest.raises(AssertionError):24        Remote(None, ['foo', 'bar'])25def test_remote_bad_protocol_missing_procedure():26    with pytest.raises(AssertionError):27        Remote(None, [{'required': [],28                       'optional': []}])29def test_remote_bad_protocol_missing_required():30    with pytest.raises(AssertionError):31        Remote(None, [{'procedure': 'no_args',32                       'optional': []}])33def test_remote_bad_protocol_missing_optional():34    with pytest.raises(AssertionError):35        Remote(None, [{'procedure': 'no_args',36                       'required': []}])37def test_remote_init(remote):38    assert hasattr(remote.no_args, '__call__')39    assert hasattr(remote.required_only, '__call__')40    assert hasattr(remote.required_and_optional, '__call__')41def test_remote_call_no_args(remote):42    remote.no_args()43    assert remote._send_msg.call_count == 144    remote._send_msg.assert_called_with({'jsonrpc': '2.0', 'params': [],45                                         'method': 'no_args', 'id': 'TEST-ID'})46def test_remote_call_no_args_with_args(remote):47    with pytest.raises(AssertionError):48        remote.no_args('foo', 'bar')49def test_remote_call_required_only(remote):50    remote.required_only('foo', 'bar')51    assert remote._send_msg.call_count == 152    remote._send_msg.assert_called_with({53        'jsonrpc': '2.0', 'params': [{'key': 'a',54                                      'value': 'foo',55                                      'required': True},56                                     {'key': 'b',57                                      'value': 'bar',58                                      'required': True}],59        'method': 'required_only', 'id': 'TEST-ID'})60def test_remote_call_required_only_with_too_few_args(remote):61    with pytest.raises(AssertionError):62        remote.required_only('foo')63def test_remote_call_required_only_with_too_many_args(remote):64    with pytest.raises(AssertionError):65        remote.no_args('foo', 'bar', 'baz')66def test_remote_call_optional_only(remote):67    remote.optional_only(x='foo', y='bar', z='baz')68    assert remote._send_msg.call_count == 169    remote._send_msg.assert_called_with({70        'jsonrpc': '2.0',71        'params': [{'key': 'x', 'value': 'foo', 'required': False},72                   {'key': 'y', 'value': 'bar', 'required': False},73                   {'key': 'z', 'value': 'baz', 'required': False}],74        'method': 'optional_only', 'id': 'TEST-ID'})75    remote.optional_only()76    assert remote._send_msg.call_count == 277    remote._send_msg.assert_called_with({78        'jsonrpc': '2.0', 'params': [],79        'method': 'optional_only', 'id': 'TEST-ID'})80def test_remote_call_optional_only_missing_arguments(remote):81    remote.optional_only(x='foo', z='bar')82    assert remote._send_msg.call_count == 183    remote._send_msg.assert_called_with({84        'jsonrpc': '2.0',85        'params': [{'key': 'x', 'value': 'foo', 'required': False},86                   {'key': 'z', 'value': 'bar', 'required': False}],87        'method': 'optional_only', 'id': 'TEST-ID'})88def test_remote_promise_resolve_success(remote):89    class Nonlocal(object):90        pass91    def success(val):92        Nonlocal.result = val93    def error(val):94        Nonlocal.result = val95    remote.no_args().then(success, error)96    remote.resolve({'id': 'TEST-ID', 'result': 'SUCCESS', 'error': None})97    assert Nonlocal.result == 'SUCCESS'98def test_remote_promise_resolve_error(remote):99    class Nonlocal(object):100        pass101    def success(val):102        Nonlocal.result = val103    def error(val):104        Nonlocal.result = val105    remote.no_args().then(success, error)106    remote.resolve({'id': 'TEST-ID', 'result': None, 'error': 'ERROR'})107    assert isinstance(Nonlocal.result, Exception)108    assert str(Nonlocal.result) == "ERROR"109@pytest.mark.skip(reason="See: geonotebook/issues/46")110def test_remote_promise_resolve_with_bad_message(r, mocker):111    class Nonlocal(object):112        pass113    def success(val):114        Nonlocal.result = val115    def error(val):116        Nonlocal.result = val117    remote.no_args().then(success, error)118    with pytest.raises(Exception):119        remote.resolve('bad message')120    remote.no_args().then(success, error)121    with pytest.raises(Exception):122        remote.resolve({'id': 'TEST-ID', 'bad': 'message'})123    remote.no_args().then(success, error)124    # warn = mockeremote.patch.object(remote.log, 'warn')125    remote.resolve({'id': 'BAD-ID'})...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
