Best Python code snippet using autotest_python
test_route_table_manager.py
Source:test_route_table_manager.py  
...74    def tearDown(self):75        super(TestRouteTableManager, self).tearDown()76        self.rtm.stop()77        self.rtm.join()78    def _new_worker(self, worker_name, worker_type):79        worker = mock.Mock(spec=worker_type, name=worker_name)80        worker.name = worker_name81        worker.enqueue = mock.Mock()82        worker._rtm_matches = set()83        worker._rtm_route_entries = set()84        return worker85    def _worker_subscriptions(self, worker, rts, wait=True,86                              afi=exa.AFI(exa.AFI.ipv4),87                              safi=exa.SAFI(exa.SAFI.mpls_vpn)):88        for rt in rts:89            subscribe = engine.Subscription(afi, safi, rt, worker)90            self.rtm.enqueue(subscribe)91        if wait:92            self._wait()93    def _worker_unsubscriptions(self, worker, rts, wait=True,94                                afi=exa.AFI(exa.AFI.ipv4),95                                safi=exa.SAFI(exa.SAFI.mpls_vpn)):96        for rt in rts:97            unsubscribe = engine.Unsubscription(afi, safi, rt, worker)98            self.rtm.enqueue(unsubscribe)99        if wait:100            self._wait()101    def _check_subscriptions(self, worker, matches):102        for match in matches:103            self.assertIn(match, worker._rtm_matches,104                          "Subscription not found")105    def _check_unsubscriptions(self, worker, matches):106        if '_rtm_matches' not in worker.__dict__:107            return108        for match in matches:109            self.assertNotIn(match, worker._rtm_matches,110                             "Subscription found while it should not: %s" %111                             worker._rtm_matches)112    def _check_events_calls(self, events, advertised_routes, withdrawn_nlris):113        # checks that each advertise event in 'events' is in advertised_routes,114        # that each withdraw event in 'events' is in withdrawn_nlris115        # and that all events in withdrawn_nlris and advertised_routes are in116        # 'events'117        for (call_args, _) in events:118            if (call_args[0].type == engine.RouteEvent.ADVERTISE):119                self.assertIn(call_args[0].route_entry, advertised_routes,120                              "Bad advertised route")121                advertised_routes.remove(call_args[0].route_entry)122            else:  # WITHDRAW123                self.assertIn(call_args[0].route_entry.nlri, withdrawn_nlris,124                              "Bad withdrawn route")125                withdrawn_nlris.remove(call_args[0].route_entry.nlri)126        self.assertEqual(0, len(advertised_routes), "some routes not advert'd")127        self.assertEqual(0, len(withdrawn_nlris), "some routes not withdrawn")128    def test_a1_subscriptions_with_no_route_to_synthesize(self):129        # Worker1 subscribes to RT1 and RT2130        worker1 = self._new_worker("worker.Worker-1", worker.Worker)131        self._worker_subscriptions(worker1, [t.RT1, t.RT2])132        # check subscriptions133        self._check_subscriptions(worker1, [MATCH1, MATCH2])134    def test_a1_check_first_last_local_worker_callback(self):135        bgp_worker1 = self._new_worker("worker.Worker-1", bpw.BGPPeerWorker)136        self._worker_subscriptions(bgp_worker1, [t.RT1])137        self._wait()138        self.assertEqual(139            0,140            self.rtm.first_local_subscriber_callback.call_count,141            "first_local_subscriber_callback should not have been called "142            " (non local worker)")143        worker1 = self._new_worker("worker.Worker-1", worker.Worker)144        self._worker_subscriptions(worker1, [t.RT1])145        self.assertEqual(146            1,147            self.rtm.first_local_subscriber_callback.call_count,148            "first_local_subscriber_callback should have been called")149        worker2 = self._new_worker("worker.Worker-2", worker.Worker)150        self._worker_subscriptions(worker2, [t.RT1])151        self.assertEqual(152            1,153            self.rtm.first_local_subscriber_callback.call_count,154            "first_local_subscriber_callback should not have been called a "155            "second time")156        self._worker_unsubscriptions(worker2, [t.RT1])157        self.assertEqual(158            0,159            self.rtm.last_local_subscriber_callback.call_count,160            "last_local_subscriber_callback should not have been called")161        self._worker_unsubscriptions(worker1, [t.RT1])162        self.assertEqual(163            1,164            self.rtm.last_local_subscriber_callback.call_count,165            "last_local_subscriber_callback should have been called")166        self._worker_unsubscriptions(bgp_worker1, [t.RT1])167        self.assertEqual(168            1,169            self.rtm.last_local_subscriber_callback.call_count,170            "last_local_subscriber_callback should not have been called "171            " (non local worker)")172    def test_a2_subscriptions_with_route_to_synthesize(self):173        # BGPPeerWorker1 advertises a route for RT1 and RT2174        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)175        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,176                                     [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)177        # BGPPeerWorker1 advertises an other route for RT2178        evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,179                                     [t.RT2], bgp_peer_worker1, t.NH1)180        # BGPPeerWorker1 subscribes to RT1181        self._worker_subscriptions(bgp_peer_worker1, [t.RT1])182        # Worker1 subscribes to RT1 and RT2183        worker1 = self._new_worker("worker.Worker-1", worker.Worker)184        self._worker_subscriptions(worker1, [t.RT1, t.RT2])185        # Worker2 subscribes to RT1186        worker2 = self._new_worker("worker.Worker-2", worker.Worker)187        self._worker_subscriptions(worker2, [t.RT1])188        # Worker3 subscribes to RT3189        worker3 = self._new_worker("worker.Worker-3", worker.Worker)190        self._worker_subscriptions(worker3, [t.RT3])191        # BGPPeerWorker2 subscribes to RT1192        bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)193        self._worker_subscriptions(bgp_peer_worker2, [t.RT1])194        # Waiting for RouteTableManager thread finishes to process the195        # subscription196        self._wait()197        # check route entry synthesized198        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,199                         "Route should not be synthesized to its source")200        self.assertEqual(0, worker3.enqueue.call_count,201                         "no route should be synthesized to Worker3")202        self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,203                         "Route should not be synthesized between BGP workers")204        self.assertEqual(2, worker1.enqueue.call_count,205                         "2 advertise events should be synthesized to Worker1")206        self._check_events_calls(worker1.enqueue.call_args_list,207                                 [evt1.route_entry, evt2.route_entry], [])208        self.assertEqual(1, worker2.enqueue.call_count,209                         "1 advertise event should be synthesized to Worker2")210        self._check_events_calls(211            worker2.enqueue.call_args_list, [evt1.route_entry], [])212    def test_a3_resubscription(self):213        # BGPPeerWorker1 advertises a route for RT1 and RT2214        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)215        route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,216                                            t.NLRI1, [t.RT1, t.RT2],217                                            bgp_peer_worker1, t.NH1)218        # Worker1 subscribes to RT1 and RT2219        worker1 = self._new_worker("worker.Worker-1", worker.Worker)220        self._worker_subscriptions(worker1, [t.RT1, t.RT2])221        # Worker1 subscribes again to RT1222        self._worker_subscriptions(worker1, [t.RT1])223        # Worker2 subscribes to RT1224        worker2 = self._new_worker("worker.Worker-2", worker.Worker)225        self._worker_subscriptions(worker2, [t.RT1])226        # Worker1 subscribes again to RT2227        self._worker_subscriptions(worker2, [t.RT2])228        # check route entry synthesized229        self.assertEqual(1, worker1.enqueue.call_count,230                         "1 route advertised should be synthesized to Worker1")231        self._check_events_calls(worker1.enqueue.call_args_list,232                                 [route_event.route_entry], [])233        self.assertEqual(1, worker2.enqueue.call_count,234                         "1 route advertised should be synthesized to Worker2")235        self._check_events_calls(worker2.enqueue.call_args_list,236                                 [route_event.route_entry], [])237    def test_a4_two_subscriptions(self):238        # Worker1 subscribes to RT1239        worker1 = self._new_worker("worker.Worker-1", worker.Worker)240        self._worker_subscriptions(worker1, [t.RT1])241        # Worker2 subscribes to RT1242        worker2 = self._new_worker("worker.Worker-2", worker.Worker)243        self._worker_subscriptions(worker2, [t.RT1])244        # Worker2 advertises a route to RT1245        self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,246                              [t.RT1], worker2, t.NH1)247        self.assertEqual(1, worker1.enqueue.call_count,248                         "1 route advertised should be synthesized to Worker1")249    def test_b1_unsubscription_with_no_route_to_synthesize(self):250        # Worker1 subscribes to RT1 and RT2251        worker1 = self._new_worker("worker.Worker-1", worker.Worker)252        self._worker_subscriptions(worker1, [t.RT1, t.RT2])253        # BGPPeerWorker1 subscribes to RT1 and RT2254        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)255        self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])256        # Worker1 unsubscribes to RT1257        self._worker_unsubscriptions(worker1, [t.RT1])258        # BGPPeerWorker1 unsubscribes to RT1 and RT2259        self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1, t.RT2])260        # check subscription/unsubscriptions261        self._check_unsubscriptions(worker1, [MATCH1])262        self._check_subscriptions(worker1, [MATCH2])263        self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])264    def test_b2_unsubscription_with_route_to_synthesize(self):265        # BGPPeerWorker1 advertises a route for RT1266        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)267        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE,268                                     t.NLRI1, [t.RT1, t.RT2],269                                     bgp_peer_worker1, t.NH1)270        # BGPPeerWorker1 advertises an other route for RT2271        evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE,272                                     t.NLRI2, [t.RT2], bgp_peer_worker1, t.NH1)273        # BGPPeerWorker1 subscribes to RT1274        self._worker_subscriptions(bgp_peer_worker1, [t.RT1])275        # Worker1 subscribes to RT1 and RT2276        worker1 = self._new_worker("worker.Worker-1", worker.Worker)277        self._worker_subscriptions(worker1, [t.RT1, t.RT2])278        # Worker2 subscribes to RT2279        worker2 = self._new_worker("worker.Worker-2", worker.Worker)280        self._worker_subscriptions(worker2, [t.RT2])281        # Worker3 subscribes to RT3282        worker3 = self._new_worker("worker.Worker-3", worker.Worker)283        self._worker_subscriptions(worker3, [t.RT3])284        # BGPPeerWorker2 subscribes to RT1285        bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)286        self._worker_subscriptions(bgp_peer_worker2, [t.RT1])287        # Workers and BGPPeerWorker unsubscriptions288        self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1], False)289        self._worker_unsubscriptions(worker1, [t.RT1], False)290        self._worker_unsubscriptions(worker2, [t.RT2], False)291        self._worker_unsubscriptions(worker3, [t.RT3], False)292        self._worker_unsubscriptions(bgp_peer_worker2, [t.RT1], False)293        # Waiting for RouteTableManager thread finishes to process the294        # subscription295        self._wait()296        # check route entry synthesized297        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,298                         "Route should not be synthesized to its source")299        self.assertEqual(0, worker3.enqueue.call_count,300                         "no route should be synthesized to Worker3")301        self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,302                         "Route should not be synthesized between "303                         "BGPPeerWorkers")304        self.assertEqual(2, worker1.enqueue.call_count,305                         "2 advertise event should be synthesized to Worker1")306        self._check_events_calls(worker1.enqueue.call_args_list,307                                 [evt1.route_entry, evt2.route_entry], [])308        self.assertEqual(4, worker2.enqueue.call_count,309                         "4 events should be synthesized to Worker2: "310                         "2 advertise and 2 withdraw")311        self._check_events_calls(worker2.enqueue.call_args_list,312                                 [evt1.route_entry, evt2.route_entry],313                                 [evt1.route_entry.nlri,314                                  evt2.route_entry.nlri])315    def test_b3_unsubscription_not_registered(self):316        # Worker1 subscribes to RT1317        worker1 = self._new_worker("worker.Worker-1", worker.Worker)318        self._worker_subscriptions(worker1, [t.RT1])319        # Worker1 unsubscribes to RT2320        self._worker_unsubscriptions(worker1, [t.RT2])321        # BGPPeerWorker1 unsubscribes to RT1322        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)323        self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1, t.RT2])324        # check subscription/unsubscriptions325        self._check_subscriptions(worker1, [MATCH1])326        self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])327    def test_c1_route_advertise_by_worker_without_propagation(self):328        # Worker1 advertises a route for RT1 and RT2329        worker1 = self._new_worker("worker.Worker-1", worker.Worker)330        route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,331                                            t.NLRI1, [t.RT1, t.RT2],332                                            worker1, t.NH1)333        # check route entry has been inserted334        self.assertIn(route_event.route_entry, worker1._rtm_route_entries,335                      "Route entry not found")336    def test_c2_route_withdraw_by_worker_without_propagation(self):337        # Worker1 advertises then withdraws a route338        worker1 = self._new_worker("worker.Worker-1", worker.Worker)339        self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,340                              [t.RT1, t.RT2], worker1, t.NH1)341        route_event = self._new_route_event(engine.RouteEvent.WITHDRAW,342                                            t.NLRI1, [t.RT1], worker1, t.NH1)343        # check route entry has been removed344        self.assertNotIn(route_event.route_entry, worker1._rtm_route_entries,345                         "Route entry found")346    def test_c3_route_advertise_by_bgp_peer_with_propagation(self):347        # Worker1 subscribes to RT1348        worker1 = self._new_worker("worker.Worker-1", worker.Worker)349        self._worker_subscriptions(worker1, [t.RT1])350        # Worker2 subscribes to RT2351        worker2 = self._new_worker("worker.Worker-2", worker.Worker)352        self._worker_subscriptions(worker2, [t.RT2])353        # BGPPeerWorker1 subscribes to RT1 and RT2354        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)355        self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])356        # BGPPeerWorker2 subscribes to RT1 and RT2357        bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)358        self._worker_subscriptions(bgp_peer_worker2, [t.RT1, t.RT2])359        # BGPPeerWorker1 advertises a route for RT1360        route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,361                                            t.NLRI1, [t.RT1], bgp_peer_worker1,362                                            t.NH1)363        # check route_event propagation364        self.assertEqual(1, worker1.enqueue.call_count,365                         "1 route should be propagated to Worker1")366        self._check_events_calls(worker1.enqueue.call_args_list,367                                 [route_event.route_entry], [])368        self.assertEqual(0, worker2.enqueue.call_count,369                         "no route should be propagated to Worker2")370        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,371                         "Route should not be propagated to its source")372        self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,373                         "Route should not be propagated between BGP workers")374    def test_c4_route_withdraw_by_peer_worker_with_propagation(self):375        # Worker1 subscribes to RT1376        worker1 = self._new_worker("worker.Worker-1", worker.Worker)377        self._worker_subscriptions(worker1, [t.RT1])378        # Worker2 subscribes to RT2379        worker2 = self._new_worker("worker.Worker-2", worker.Worker)380        self._worker_subscriptions(worker2, [t.RT2])381        # Worker3 subscribes to RT3382        worker3 = self._new_worker("worker.Worker-3", worker.Worker)383        self._worker_subscriptions(worker3, [t.RT3])384        # BGPPeerWorker1 subscribes to RT1385        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)386        self._worker_subscriptions(bgp_peer_worker1, [t.RT1])387        # BGPPeerWorker2 subscribes to RT2388        bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)389        self._worker_subscriptions(bgp_peer_worker2, [t.RT2])390        # BGPPeerWorker1 advertises a route for RT1 and RT2391        route_eventA = self._new_route_event(engine.RouteEvent.ADVERTISE,392                                             t.NLRI1, [t.RT1, t.RT2],393                                             bgp_peer_worker1, t.NH1)394        # BGPPeerWorker1 withdraw previous route (without RT395        route_eventW = self._new_route_event(engine.RouteEvent.WITHDRAW,396                                             t.NLRI1, [],397                                             bgp_peer_worker1, t.NH1)398        # check route_event propagation399        self.assertEqual(2, worker1.enqueue.call_count,400                         "2 routes should be propagated to Worker1")401        self._check_events_calls(worker1.enqueue.call_args_list,402                                 [route_eventA.route_entry],403                                 [route_eventW.route_entry.nlri])404        self.assertEqual(2, worker2.enqueue.call_count,405                         "2 routes should be propagated to Worker2")406        self._check_events_calls(worker2.enqueue.call_args_list,407                                 [route_eventA.route_entry],408                                 [route_eventW.route_entry.nlri])409        self.assertEqual(0, worker3.enqueue.call_count,410                         "No route should be propagated to Worker3")411        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,412                         "Route should not be propagated to its source")413        self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,414                         "Route should not be propagated between BGP workers")415    def test_c5_route_update_by_bgp_peer_with_withdraw_propagation(self):416        # Worker1 subscribes to RT1417        worker1 = self._new_worker("worker.Worker-1", worker.Worker)418        self._worker_subscriptions(worker1, [t.RT1])419        # Worker2 subscribes to RT2420        worker2 = self._new_worker("worker.Worker-2", worker.Worker)421        self._worker_subscriptions(worker2, [t.RT2])422        # Worker3 subscribes to RT3423        worker3 = self._new_worker("worker.Worker-3", worker.Worker)424        self._worker_subscriptions(worker3, [t.RT3])425        # BGPPeerWorker1 advertises a route for RT1, RT2 and RT3426        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)427        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,428                                     [t.RT1, t.RT2, t.RT3],429                                     bgp_peer_worker1, t.NH1)430        # BGPPeerWorker1 advertises the same nlri with attributes NH and RTs431        # modification432        evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,433                                     [t.RT1, t.RT2],434                                     bgp_peer_worker1, t.NH2)435        # check route event propagation436        # TO DO : check route_event.replaced_route437        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,438                         "Route should not be propagated to its source")439        self.assertEqual(2, worker1.enqueue.call_count,440                         "2 routes should be advertised to Worker1")441        self._check_events_calls(worker1.enqueue.call_args_list,442                                 [evt1.route_entry, evt2.route_entry], [])443        self.assertEqual(2, worker2.enqueue.call_count,444                         "2 routes should be advertised to Worker2")445        self._check_events_calls(worker2.enqueue.call_args_list,446                                 [evt1.route_entry, evt2.route_entry], [])447        self.assertEqual(2, worker3.enqueue.call_count,448                         "2 routes should be advertised/withdrawn to Worker3")449        self._check_events_calls(worker3.enqueue.call_args_list,450                                 [evt1.route_entry], [evt1.route_entry.nlri])451    def test_c6_route_readvertised(self):452        # Worker1 subscribes to RT1453        worker1 = self._new_worker("worker.Worker-1", worker.Worker)454        self._worker_subscriptions(worker1, [t.RT1, t.RT2, t.RT3])455        # BGPPeerWorker1 advertises a route for RT1, RT2 and RT3456        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)457        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,458                                     [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)459        # BGPPeerWorker1 advertises the same nlri with same attributes and RTs460        self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,461                              [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)462        # check route event propagation463        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,464                         "Route should not be propagated to its source")465        self.assertEqual(1, worker1.enqueue.call_count,466                         "only 1 route should be advertised to Worker1")467        self._check_events_calls(worker1.enqueue.call_args_list,468                                 [evt1.route_entry], [])469    def test_c7_route_withdraw_not_registered(self):470        # Worker1 subscribes to RT1471        worker1 = self._new_worker("worker.Worker-1", worker.Worker)472        self._worker_subscriptions(worker1, [t.RT1, t.RT2])473        # BGPPeerWorker1 advertises a route for RT1 and RT2474        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)475        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,476                                     [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)477        # BGPPeerWorker1 withdraw a not registered route (without RT478        self._new_route_event(engine.RouteEvent.WITHDRAW, t.NLRI2, [],479                              bgp_peer_worker1, t.NH1)480        # Waiting for RouteTableManager thread finishes to process route_event481        self._wait()482        # check route_event propagation483        self.assertEqual(1, worker1.enqueue.call_count,484                         "1 route1 should be propagated to Worker1")485        self._check_events_calls(worker1.enqueue.call_args_list,486                                 [evt1.route_entry], [])487        self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,488                         "Route should not be propagated back to its source")489    def test_d1_worker_cleanup(self):490        # Worker1 subscribes to RT1491        worker1 = self._new_worker("worker.Worker-1", worker.Worker)492        self._worker_subscriptions(worker1, [t.RT1])493        # Worker2 subscribes to RT2494        worker2 = self._new_worker("worker.Worker-2", worker.Worker)495        self._worker_subscriptions(worker2, [t.RT2])496        # BGPPeerWorker1 subscribes to RT1 and RT2497        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)498        self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])499        # BGPPeerWorker1 advertises a route for RT1 and RT2500        evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,501                                     [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)502        # BGPPeerWorker1 advertises an other route for RT2503        evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,504                                     [t.RT2], bgp_peer_worker1, t.NH1)505        # Cleanup Worker1506        self.rtm.enqueue(engine.WorkerCleanupEvent(bgp_peer_worker1))507        # Waiting for RouteTableManager thread finishes to process the508        # subscriptions509        self._wait()510        self.assertEqual(511            0,512            self.rtm.last_local_subscriber_callback.call_count,513            "last_local_subscriber_callback should not have been called "514            " (non local worker)")515        # check unsubscriptions516        self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])517        # Check route synthesize to Worker1 and Worker2518        self.assertEqual(2, worker1.enqueue.call_count,519                         "2 routes should be advert/withdraw to Worker1")520        self._check_events_calls(worker1.enqueue.call_args_list,521                                 [evt1.route_entry], [evt1.route_entry.nlri])522        self.assertEqual(4, worker2.enqueue.call_count,523                         "4 routes should be advert/withdraw to Worker2")524        self._check_events_calls(worker2.enqueue.call_args_list,525                                 [evt1.route_entry, evt2.route_entry],526                                 [evt1.route_entry.nlri,527                                  evt2.route_entry.nlri])528        # Check route entries have been removed for BGPPeerWorker1529        self.assertNotIn(evt1.route_entry, bgp_peer_worker1._rtm_route_entries,530                         "Route entry found")531        self.assertNotIn(evt2.route_entry, bgp_peer_worker1._rtm_route_entries,532                         "Route entry found")533    def test_e1_dump_state(self):534        # BGPPeerWorker1 advertises a route for RT1 and RT2535        bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)536        self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,537                              [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)538        # BGPPeerWorker1 advertises an other route for RT2539        self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,540                              [t.RT2], bgp_peer_worker1, t.NH1)541        # BGPPeerWorker1 subscribes to RT1542        self._worker_subscriptions(bgp_peer_worker1, [t.RT1])543        # Worker1 subscribes to RT1 and RT2544        worker1 = self._new_worker("worker.Worker-1", worker.Worker)545        self._worker_subscriptions(worker1, [t.RT1, t.RT2])546        # Worker2 subscribes to RT1547        worker2 = self._new_worker("worker.Worker-2", worker.Worker)548        self._worker_subscriptions(worker2, [t.RT1])549        # Worker3 subscribes to RT3550        worker3 = self._new_worker("worker.Worker-3", worker.Worker)551        self._worker_subscriptions(worker3, [t.RT3])552        self.rtm._dump_state()553    def test_7_matches(self):554        m1a = rtm.Match(exa.AFI(exa.AFI.ipv4),555                        exa.SAFI(exa.SAFI.mpls_vpn),556                        exa.RouteTarget(64512, 1))557        m1b = rtm.Match(exa.AFI(exa.AFI.ipv4),558                        exa.SAFI(exa.SAFI.mpls_vpn),559                        exa.RouteTarget(64512, 1))560        m1c = rtm.Match(exa.AFI(exa.AFI.ipv4),561                        exa.SAFI(exa.SAFI.mpls_vpn),562                        exa.RouteTarget(64512, 1, False))563        m2 = rtm.Match(exa.AFI(exa.AFI.ipv4),564                       exa.SAFI(exa.SAFI.mpls_vpn),565                       exa.RouteTarget(64512, 2))566        m3 = rtm.Match(exa.AFI(exa.AFI.ipv4),567                       exa.SAFI(exa.SAFI.mpls_vpn),568                       exa.RouteTarget(64513, 1))569        self.assertEqual(hash(m1a), hash(m1b))570        self.assertEqual(hash(m1a), hash(m1c))571        self.assertNotEqual(hash(m1a), hash(m2))572        self.assertNotEqual(hash(m1a), hash(m3))573        self.assertEqual(m1a, m1b)574        self.assertEqual(m1a, m1c)575        self.assertNotEqual(m1a, m2)576        self.assertNotEqual(m1a, m3)577    def test_f1_test_empty_rt(self):578        # worker advertises a route with no RT579        w1 = self._new_worker("Worker1", worker.Worker)580        subscribe = engine.Subscription(exa.AFI(exa.AFI.ipv4),581                                        exa.SAFI(exa.SAFI.mpls_vpn),582                                        None, w1)583        self.rtm.enqueue(subscribe)584        w2 = self._new_worker("Worker2", worker.Worker)585        route_event = engine.RouteEvent(586            engine.RouteEvent.ADVERTISE,587            engine.RouteEntry(t.NLRI1, None, exa.Attributes()),588            w2)589        self.rtm.enqueue(route_event)590        self._wait()591        self.assertEqual(1, w1.enqueue.call_count,...threadpool.py
Source:threadpool.py  
...75    while True:76      self._lock.acquire()77      if self._available == 0:78         self._lock.release()79         self._new_worker()80      else:81        break82    self._tasks.put((func, args, kwargs))83    if self.available > self._total / 2 and self.total > 8:84      for i in xrange(self._total / 2 - 1):85        self._tasks.put((None,None,None))86    self._lock.release()87  def join (self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
