Best Python code snippet using Testify_python
test_rosmaster_registrations.py
Source:test_rosmaster_registrations.py  
...41        from rosmaster.registrations import NodeRef, Registrations42        n = NodeRef('n1', 'http://localhost:1234')43        # test services44        n.add(Registrations.SERVICE, 'add_two_ints')45        self.failIf(n.is_empty())46        self.assert_('add_two_ints' in n.services)47        self.assertEquals(['add_two_ints'], n.services)48        49        n.add(Registrations.SERVICE, 'add_three_ints')50        self.failIf(n.is_empty())51        self.assert_('add_three_ints' in n.services)52        self.assert_('add_two_ints' in n.services)53        n.remove(Registrations.SERVICE, 'add_two_ints')54        self.assert_('add_three_ints' in n.services)55        self.assertEquals(['add_three_ints'], n.services)56        self.failIf('add_two_ints' in n.services)57        self.failIf(n.is_empty())58        59        n.remove(Registrations.SERVICE, 'add_three_ints')        60        self.failIf('add_three_ints' in n.services)61        self.failIf('add_two_ints' in n.services)62        self.assertEquals([], n.services)63        self.assert_(n.is_empty())64    def test_NodeRef_subs(self):65        from rosmaster.registrations import NodeRef, Registrations66        n = NodeRef('n1', 'http://localhost:1234')67        # test topic suscriptions68        n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')69        self.failIf(n.is_empty())70        self.assert_('topic1' in n.topic_subscriptions)71        self.assertEquals(['topic1'], n.topic_subscriptions)72        73        n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')74        self.failIf(n.is_empty())75        self.assert_('topic2' in n.topic_subscriptions)76        self.assert_('topic1' in n.topic_subscriptions)77        n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')78        self.assert_('topic2' in n.topic_subscriptions)79        self.assertEquals(['topic2'], n.topic_subscriptions)80        self.failIf('topic1' in n.topic_subscriptions)81        self.failIf(n.is_empty())82        83        n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')        84        self.failIf('topic2' in n.topic_subscriptions)85        self.failIf('topic1' in n.topic_subscriptions)86        self.assertEquals([], n.topic_subscriptions)87        self.assert_(n.is_empty())88    def test_NodeRef_pubs(self):89        from rosmaster.registrations import NodeRef, Registrations90        n = NodeRef('n1', 'http://localhost:1234')91        # test topic publications92        n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')93        self.failIf(n.is_empty())94        self.assert_('topic1' in n.topic_publications)95        self.assertEquals(['topic1'], n.topic_publications)96        97        n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')98        self.failIf(n.is_empty())99        self.assert_('topic2' in n.topic_publications)100        self.assert_('topic1' in n.topic_publications)101        n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic1')102        self.assert_('topic2' in n.topic_publications)103        self.assertEquals(['topic2'], n.topic_publications)104        self.failIf('topic1' in n.topic_publications)105        self.failIf(n.is_empty())106        107        n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic2')        108        self.failIf('topic2' in n.topic_publications)109        self.failIf('topic1' in n.topic_publications)110        self.assertEquals([], n.topic_publications)111        self.assert_(n.is_empty())112    def test_NodeRef_base(self):113        import rosmaster.exceptions114        from rosmaster.registrations import NodeRef, Registrations115        n = NodeRef('n1', 'http://localhost:1234')116        self.assertEquals('http://localhost:1234', n.api)117        self.assertEquals([], n.param_subscriptions)118        self.assertEquals([], n.topic_subscriptions)119        self.assertEquals([], n.topic_publications)120        self.assertEquals([], n.services)121        self.assert_(n.is_empty())122        try:123            n.add(12345, 'topic')124            self.fail("should have failed with invalid type")125        except rosmaster.exceptions.InternalException: pass126        try:127            n.remove(12345, 'topic')128            self.fail("should have failed with invalid type")129        except rosmaster.exceptions.InternalException: pass130        n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')131        n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')132        n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')        133        n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic3')        134        n.add(Registrations.PARAM_SUBSCRIPTIONS, 'topic4')        135        n.add(Registrations.SERVICE, 'serv')        136        self.failIf(n.is_empty())137        n.clear()138        self.assert_(n.is_empty())        139    def test_NodeRef_param_subs(self):140        from rosmaster.registrations import NodeRef, Registrations141        n = NodeRef('n1', 'http://localhost:1234')142        # test param suscriptions143        n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param1')144        self.failIf(n.is_empty())145        self.assert_('param1' in n.param_subscriptions)146        self.assertEquals(['param1'], n.param_subscriptions)147        148        n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param2')149        self.failIf(n.is_empty())150        self.assert_('param2' in n.param_subscriptions)151        self.assert_('param1' in n.param_subscriptions)152        n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param1')153        self.assert_('param2' in n.param_subscriptions)154        self.assertEquals(['param2'], n.param_subscriptions)155        self.failIf('param1' in n.param_subscriptions)156        self.failIf(n.is_empty())157        158        n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param2')        159        self.failIf('param2' in n.param_subscriptions)160        self.failIf('param1' in n.param_subscriptions)161        self.assertEquals([], n.param_subscriptions)162        self.assert_(n.is_empty())163    ## subroutine of registration tests that test topic/param type Reg objects164    ## @param r Registrations: initialized registrations object to test165    def _subtest_Registrations_basic(self, r):166        #NOTE: no real difference between topic and param names, so tests are reusable167        # - note that we've updated node1's API168        r.register('topic1', 'node1', 'http://node1:5678')169        self.assert_('topic1' in r) # test contains170        self.assert_(r.has_key('topic1')) # test contains171        self.assertEquals(['topic1'], [k for k in r.iterkeys()])172        self.assertEquals(['http://node1:5678'], r.get_apis('topic1'))173        self.assertEquals([('node1', 'http://node1:5678')], r['topic1'])174        self.failIf(not r) #test nonzero175        self.assertEquals(None, r.get_service_api('topic1')) #make sure no contamination176        self.assertEquals([['topic1', ['node1']]], r.get_state())177        r.register('topic1', 'node2', 'http://node2:5678')178        self.assertEquals(['topic1'], [k for k in r.iterkeys()])        179        self.assertEquals(['topic1'], [k for k in r.iterkeys()])180        self.assertEquals(2, len(r.get_apis('topic1')))181        self.assert_('http://node1:5678' in r.get_apis('topic1'))182        self.assert_('http://node2:5678' in r.get_apis('topic1'))183        self.assertEquals(2, len(r['topic1']))184        self.assert_(('node1', 'http://node1:5678') in r['topic1'], r['topic1'])185        self.assert_(('node2', 'http://node2:5678') in r['topic1'])                186        self.assertEquals([['topic1', ['node1', 'node2']]], r.get_state())187        # TODO: register second topic188        r.register('topic2', 'node3', 'http://node3:5678')189        self.assert_('topic2' in r) # test contains190        self.assert_(r.has_key('topic2')) # test contains191        self.assert_('topic1' in [k for k in r.iterkeys()])192        self.assert_('topic2' in [k for k in r.iterkeys()])193        self.assertEquals(['http://node3:5678'], r.get_apis('topic2'))194        self.assertEquals([('node3', 'http://node3:5678')], r['topic2'])195        self.failIf(not r) #test nonzero196        self.assert_(['topic1', ['node1', 'node2']] in r.get_state(), r.get_state())197        self.assert_(['topic2', ['node3']] in r.get_state(), r.get_state())198        199        # Unregister200        # - fail if node is not registered201        code, _, val = r.unregister('topic1', 'node3', 'http://node3:5678')202        self.assertEquals(0, val)203        # - fail if topic is not registered by that node204        code, _, val = r.unregister('topic2', 'node2', 'http://node2:5678')205        self.assertEquals(0, val)206        # - fail if URI does not match207        code, _, val = r.unregister('topic2', 'node2', 'http://fakenode2:5678')208        self.assertEquals(0, val)209        # - unregister node2210        code, _, val = r.unregister('topic1', 'node1', 'http://node1:5678')211        self.assertEquals(1, code)212        self.assertEquals(1, val)213        self.assert_('topic1' in r) # test contains214        self.assert_(r.has_key('topic1')) 215        self.assert_('topic1' in [k for k in r.iterkeys()])216        self.assert_('topic2' in [k for k in r.iterkeys()])217        self.assertEquals(['http://node2:5678'], r.get_apis('topic1'))218        self.assertEquals([('node2', 'http://node2:5678')], r['topic1'])219        self.failIf(not r) #test nonzero220        self.assert_(['topic1', ['node2']] in r.get_state())221        self.assert_(['topic2', ['node3']] in r.get_state())        222        code, _, val = r.unregister('topic1', 'node2', 'http://node2:5678')223        self.assertEquals(1, code)224        self.assertEquals(1, val)225        self.failIf('topic1' in r) # test contains226        self.failIf(r.has_key('topic1')) 227        self.assertEquals(['topic2'], [k for k in r.iterkeys()])228        self.assertEquals([], r.get_apis('topic1'))229        self.assertEquals([], r['topic1'])230        self.failIf(not r) #test nonzero231        self.assertEquals([['topic2', ['node3']]], r.get_state())232        # clear out last reg233        code, _, val = r.unregister('topic2', 'node3', 'http://node3:5678')234        self.assertEquals(1, code)235        self.assertEquals(1, val)236        self.failIf('topic2' in r) # test contains237        self.assert_(not r)238        self.assertEquals([], r.get_state())        239        240    def test_Registrations(self):241        import rosmaster.exceptions242        from rosmaster.registrations import Registrations243        types = [Registrations.TOPIC_SUBSCRIPTIONS,244                 Registrations.TOPIC_PUBLICATIONS,245                 Registrations.SERVICE,246                 Registrations.PARAM_SUBSCRIPTIONS]247        # test enums248        self.assertEquals(4, len(set(types)))249        try:250            r = Registrations(-1)251            self.fail("Registrations accepted invalid type")252        except rosmaster.exceptions.InternalException: pass253        254        for t in types:255            r = Registrations(t)256            self.assertEquals(t, r.type)257            self.assert_(not r) #test nonzero258            self.failIf('topic1' in r) #test contains            259            self.failIf(r.has_key('topic1')) #test has_key260            self.failIf([k for k in r.iterkeys()]) #no keys261            self.assertEquals(None, r.get_service_api('non-existent'))262        # Test topic subs263        r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)264        self._subtest_Registrations_basic(r)265        r = Registrations(Registrations.TOPIC_PUBLICATIONS)        266        self._subtest_Registrations_basic(r)267        r = Registrations(Registrations.PARAM_SUBSCRIPTIONS)        268        self._subtest_Registrations_basic(r)269        r = Registrations(Registrations.SERVICE)        270        self._subtest_Registrations_services(r)271    def test_RegistrationManager_services(self):272        from rosmaster.registrations import Registrations, RegistrationManager273        rm = RegistrationManager(ThreadPoolMock())274        275        self.assertEquals(None, rm.get_node('caller1'))276        # do an unregister first, before service_api is initialized277        code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://one:1234')278        self.assertEquals(1, code)279        self.assertEquals(0, val)        280        281        rm.register_service('s1', 'caller1', 'http://one:1234', 'rosrpc://one:1234')282        self.assert_(rm.services.has_key('s1'))283        self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1')) 284        self.assertEquals('http://one:1234', rm.get_node('caller1').api)285        self.assertEquals([['s1', ['caller1']]], rm.services.get_state())286        287        # - verify that changed caller_api updates ref288        rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://one:1234')289        self.assert_(rm.services.has_key('s1'))290        self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1'))        291        self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)292        self.assertEquals([['s1', ['caller1']]], rm.services.get_state())293        294        # - verify that changed service_api updates ref295        rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://oneB:1234')296        self.assert_(rm.services.has_key('s1'))297        self.assertEquals('rosrpc://oneB:1234', rm.services.get_service_api('s1'))        298        self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)299        self.assertEquals([['s1', ['caller1']]], rm.services.get_state())300        301        rm.register_service('s2', 'caller2', 'http://two:1234', 'rosrpc://two:1234')302        self.assertEquals('http://two:1234', rm.get_node('caller2').api)303        # - unregister should be noop if service api does not match304        code, msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://b:1234')305        self.assertEquals(1, code)306        self.assertEquals(0, val)        307        self.assert_(rm.services.has_key('s2'))308        self.assertEquals('http://two:1234', rm.get_node('caller2').api)        309        self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))310        311        # - unregister should be noop if service is unknown312        code, msg, val = rm.unregister_service('unknown', 'caller2', 'rosrpc://two:1234')313        self.assertEquals(1, code)314        self.assertEquals(0, val)        315        self.assert_(rm.services.has_key('s2'))316        self.assertEquals('http://two:1234', rm.get_node('caller2').api)        317        self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))318        # - unregister should clear all knowledge of caller2319        code,msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://two:1234')320        self.assertEquals(1, code)321        self.assertEquals(1, val)        322        self.assert_(rm.services.has_key('s1')) 323        self.failIf(rm.services.has_key('s2'))        324        self.assertEquals(None, rm.get_node('caller2'))325        code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://oneB:1234')326        self.assertEquals(1, code)        327        self.assertEquals(1, val)        328        self.assert_(not rm.services.__nonzero__())329        self.failIf(rm.services.has_key('s1'))        330        self.assertEquals(None, rm.get_node('caller1'))        331    def test_RegistrationManager_topic_pub(self):332        from rosmaster.registrations import Registrations, RegistrationManager333        rm = RegistrationManager(ThreadPoolMock())334        self.subtest_RegistrationManager(rm, rm.publishers, rm.register_publisher, rm.unregister_publisher)335        336    def test_RegistrationManager_topic_sub(self):337        from rosmaster.registrations import Registrations, RegistrationManager338        rm = RegistrationManager(ThreadPoolMock())339        self.subtest_RegistrationManager(rm, rm.subscribers, rm.register_subscriber, rm.unregister_subscriber)340    def test_RegistrationManager_param_sub(self):341        from rosmaster.registrations import Registrations, RegistrationManager342        rm = RegistrationManager(ThreadPoolMock())343        self.subtest_RegistrationManager(rm, rm.param_subscribers, rm.register_param_subscriber, rm.unregister_param_subscriber)344        345    def subtest_RegistrationManager(self, rm, r, register, unregister):346        self.assertEquals(None, rm.get_node('caller1'))347        register('key1', 'caller1', 'http://one:1234')348        self.assert_(r.has_key('key1'))349        self.assertEquals('http://one:1234', rm.get_node('caller1').api)350        self.assertEquals([['key1', ['caller1']]], r.get_state())351        352        # - verify that changed caller_api updates ref353        register('key1', 'caller1', 'http://oneB:1234')354        self.assert_(r.has_key('key1'))355        self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)356        self.assertEquals([['key1', ['caller1']]], r.get_state())357        358        register('key2', 'caller2', 'http://two:1234')359        self.assertEquals('http://two:1234', rm.get_node('caller2').api)360        # - unregister should be noop if caller api does not match361        code, msg, val = unregister('key2', 'caller2', 'http://b:1234')362        self.assertEquals(1, code)363        self.assertEquals(0, val)        364        self.assertEquals('http://two:1234', rm.get_node('caller2').api)        365        366        # - unregister should be noop if key is unknown367        code, msg, val = unregister('unknown', 'caller2', 'http://two:1234')368        self.assertEquals(1, code)369        self.assertEquals(0, val)        370        self.assert_(r.has_key('key2'))371        self.assertEquals('http://two:1234', rm.get_node('caller2').api)        372        # - unregister should be noop if unknown node373        code, msg, val = rm.unregister_publisher('key2', 'unknown', 'http://unknown:1')374        self.assertEquals(1, code)375        self.assertEquals(0, val)        376        self.assert_(r.has_key('key2'))377        # - unregister should clear all knowledge of caller2378        code,msg, val = unregister('key2', 'caller2', 'http://two:1234')379        self.assertEquals(1, code)380        self.assertEquals(1, val)        381        self.assert_(r.has_key('key1')) 382        self.failIf(r.has_key('key2'))        383        self.assertEquals(None, rm.get_node('caller2'))384        code, msg, val = unregister('key1', 'caller1', 'http://oneB:1234')385        self.assertEquals(1, code)        386        self.assertEquals(1, val)        387        self.assert_(not r.__nonzero__())388        self.failIf(r.has_key('key1'))        389        self.assertEquals(None, rm.get_node('caller1'))        390    def test_RegistrationManager_base(self):391        import rosmaster.exceptions392        from rosmaster.registrations import Registrations, RegistrationManager393        threadpool = ThreadPoolMock()394        rm = RegistrationManager(threadpool)395        self.assert_(isinstance(rm.services, Registrations))396        self.assertEquals(Registrations.SERVICE, rm.services.type)397        self.assert_(isinstance(rm.param_subscribers, Registrations))398        self.assertEquals(Registrations.PARAM_SUBSCRIPTIONS, rm.param_subscribers.type)399        self.assert_(isinstance(rm.subscribers, Registrations))400        self.assertEquals(Registrations.TOPIC_SUBSCRIPTIONS, rm.subscribers.type)401        self.assert_(isinstance(rm.subscribers, Registrations))402        self.assertEquals(Registrations.TOPIC_PUBLICATIONS, rm.publishers.type)403        self.assert_(isinstance(rm.publishers, Registrations))404        #test auto-clearing of registrations if node API changes405        rm.register_publisher('pub1', 'caller1', 'http://one:1')406        rm.register_publisher('pub1', 'caller2', 'http://two:1')407        rm.register_publisher('pub1', 'caller3', 'http://three:1')408        rm.register_subscriber('sub1', 'caller1', 'http://one:1')409        rm.register_subscriber('sub1', 'caller2', 'http://two:1')410        rm.register_subscriber('sub1', 'caller3', 'http://three:1')411        rm.register_param_subscriber('p1', 'caller1', 'http://one:1')412        rm.register_param_subscriber('p1', 'caller2', 'http://two:1')413        rm.register_param_subscriber('p1', 'caller3', 'http://three:1')414        rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://one:1')415        self.assertEquals('http://one:1', rm.get_node('caller1').api)416        self.assertEquals('http://two:1', rm.get_node('caller2').api)417        self.assertEquals('http://three:1', rm.get_node('caller3').api)        418        # - first, make sure that changing rosrpc URI does not erase state419        rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://oneB:1')420        n = rm.get_node('caller1')421        self.assertEquals(['pub1'], n.topic_publications)422        self.assertEquals(['sub1'], n.topic_subscriptions)423        self.assertEquals(['p1'], n.param_subscriptions)                424        self.assertEquals(['s1'], n.services)425        self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))426        self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))427        self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))428        self.assert_('http://one:1' in rm.services.get_apis('s1'))429        # - also, make sure unregister does not erase state if API changed430        rm.unregister_publisher('pub1', 'caller1', 'http://not:1')431        self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))432        rm.unregister_subscriber('sub1', 'caller1', 'http://not:1')433        self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))434        rm.unregister_param_subscriber('p1', 'caller1', 'http://not:1')435        self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))436        rm.unregister_service('sub1', 'caller1', 'rosrpc://not:1')437        self.assert_('http://one:1' in rm.services.get_apis('s1'))438        439        440        # erase caller1 sub/srvs/params via register_publisher441        rm.register_publisher('pub1', 'caller1', 'http://newone:1')442        self.assertEquals('http://newone:1', rm.get_node('caller1').api)        443        # - check node ref444        n = rm.get_node('caller1')445        self.assertEquals(['pub1'], n.topic_publications)446        self.assertEquals([], n.services)447        self.assertEquals([], n.topic_subscriptions)448        self.assertEquals([], n.param_subscriptions)449        # - checks publishers450        self.assert_('http://newone:1' in rm.publishers.get_apis('pub1'))451        # - checks subscribers452        self.assert_(rm.subscribers.has_key('sub1'))453        self.failIf('http://one:1' in rm.subscribers.get_apis('sub1'))454        # - checks param subscribers455        self.assert_(rm.param_subscribers.has_key('p1'))456        self.failIf('http://one:1' in rm.param_subscribers.get_apis('p1'))457        # erase caller2 pub/sub/params via register_service458        # - initial state459        self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))460        self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))461        self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))462        # - change ownership of s1 to caller2463        rm.register_service('s1', 'caller2', 'http://two:1', 'rosrpc://two:1')464        self.assert_('http://two:1' in rm.services.get_apis('s1'))465        self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))466        self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))467        self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))468        469        rm.register_service('s1', 'caller2', 'http://newtwo:1', 'rosrpc://newtwo:1')470        self.assertEquals('http://newone:1', rm.get_node('caller1').api)        471        # - check node ref472        n = rm.get_node('caller2')473        self.assertEquals([], n.topic_publications)474        self.assertEquals(['s1'], n.services)475        self.assertEquals([], n.topic_subscriptions)476        self.assertEquals([], n.param_subscriptions)477        # - checks publishers478        self.assert_(rm.publishers.has_key('pub1'))479        self.failIf('http://two:1' in rm.publishers.get_apis('pub1'))480        # - checks subscribers481        self.assert_(rm.subscribers.has_key('sub1'))482        self.failIf('http://two:1' in rm.subscribers.get_apis('sub1'))483        self.assertEquals([['sub1', ['caller3']]], rm.subscribers.get_state())484        # - checks param subscribers485        self.assert_(rm.param_subscribers.has_key('p1'))486        self.failIf('http://two:1' in rm.param_subscribers.get_apis('p1'))487        self.assertEquals([['p1', ['caller3']]], rm.param_subscribers.get_state())488        489    def test_Registrations_unregister_all(self):490        import rosmaster.exceptions491        from rosmaster.registrations import Registrations492        r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)        493        for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:        494            r.register(k, 'node1', 'http://node1:5678')495        r.register('topic2', 'node2', 'http://node2:5678')496        r.unregister_all('node1')497        self.failIf(not r)498        for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:        499            self.failIf(r.has_key(k))500        self.assertEquals(['topic2'], [k for k in r.iterkeys()])501        502        r = Registrations(Registrations.TOPIC_PUBLICATIONS)        503        for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:        504            r.register(k, 'node1', 'http://node1:5678')505        r.register('topic2', 'node2', 'http://node2:5678')506        r.unregister_all('node1')507        self.failIf(not r)508        for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']:        509            self.failIf(r.has_key(k))510        self.assertEquals(['topic2'], [k for k in r.iterkeys()])511        r = Registrations(Registrations.PARAM_SUBSCRIPTIONS)        512        r.register('param2', 'node2', 'http://node2:5678')513        for k in ['param1', 'param1b', 'param1c', 'param1d']:514            r.register(k, 'node1', 'http://node1:5678')515        r.unregister_all('node1')516        self.failIf(not r)517        for k in ['param1', 'param1b', 'param1c', 'param1d']:518            self.failIf(r.has_key(k))519        self.assertEquals(['param2'], [k for k in r.iterkeys()])520        521        r = Registrations(Registrations.SERVICE)        522        for k in ['service1', 'service1b', 'service1c', 'service1d']:523            r.register(k, 'node1', 'http://node1:5678', 'rosrpc://node1:1234')524        r.register('service2', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')525        r.unregister_all('node1')526        self.failIf(not r)527        for k in ['service1', 'service1b', 'service1c', 'service1d']:528            self.failIf(r.has_key(k))529            self.assertEquals(None, r.get_service_api(k))530        self.assertEquals(['service2'], [k for k in r.iterkeys()])531        self.assertEquals('rosrpc://node2:1234', r.get_service_api('service2'))532    def _subtest_Registrations_services(self, r):533        import rosmaster.exceptions534        # call methods that use service_api_map, make sure they are guarded against lazy-init535        self.assertEquals(None, r.get_service_api('s1'))536        r.unregister_all('node1')537        # do an unregister first, before service_api is initialized538        code, msg, val = r.unregister('s1', 'caller1', None, 'rosrpc://one:1234')539        self.assertEquals(1, code)540        self.assertEquals(0, val)        541        try:542            r.register('service1', 'node1', 'http://node1:5678')543            self.fail("should require service_api")544        except rosmaster.exceptions.InternalException: pass545        546        r.register('service1', 'node1', 'http://node1:5678', 'rosrpc://node1:1234')547        548        self.assert_('service1' in r) # test contains549        self.assert_(r.has_key('service1')) # test contains550        self.assertEquals(['service1'], [k for k in r.iterkeys()])551        self.assertEquals(['http://node1:5678'], r.get_apis('service1'))552        self.assertEquals('rosrpc://node1:1234', r.get_service_api('service1'))553        self.assertEquals([('node1', 'http://node1:5678')], r['service1'])554        self.failIf(not r) #test nonzero555        self.assertEquals([['service1', ['node1']]], r.get_state())556        r.register('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')557        self.assertEquals(['service1'], [k for k in r.iterkeys()])558        self.assertEquals('rosrpc://node2:1234', r.get_service_api('service1'))559        self.assertEquals(['http://node2:5678'], r.get_apis('service1'))560        self.assertEquals([('node2', 'http://node2:5678')], r['service1'])561        self.assertEquals([['service1', ['node2']]], r.get_state())562        # register a second service563        r.register('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')564        self.assertEquals('rosrpc://node3:1234', r.get_service_api('service2'))565        self.assertEquals(2, len(r.get_state()))566        self.assert_(['service2', ['node3']] in r.get_state(), r.get_state())567        self.assert_(['service1', ['node2']] in r.get_state())568        569        # register a third service, second service for node2570        r.register('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')571        self.assertEquals(3, len(r.get_state()))572        self.assert_(['service2', ['node3']] in r.get_state())573        self.assert_(['service1b', ['node2']] in r.get_state())574        self.assert_(['service1', ['node2']] in r.get_state())575        576        # Unregister577        try:578            r.unregister('service1', 'node2', 'http://node2:1234')579            self.fail("service_api param must be specified")580        except rosmaster.exceptions.InternalException: pass581        582        # - fail if service is not known583        code, _, val = r.unregister('unknown', 'node2', 'http://node2:5678', 'rosprc://node2:1234')584        self.assertEquals(0, val)585        # - fail if node is not registered586        code, _, val = r.unregister('service1', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')587        self.assertEquals(0, val)588        # - fail if service API is different589        code, _, val = r.unregister('service1', 'node2', 'http://node2b:5678', 'rosrpc://node3:1234')590        self.assertEquals(0, val)591        # - unregister service2592        code, _, val = r.unregister('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')593        self.assertEquals(1, code)594        self.assertEquals(1, val)595        self.failIf('service2' in r) # test contains596        self.failIf(r.has_key('service2')) 597        self.assert_('service1' in [k for k in r.iterkeys()])598        self.assert_('service1b' in [k for k in r.iterkeys()])599        self.assertEquals([], r.get_apis('service2'))600        self.assertEquals([], r['service2'])601        self.failIf(not r) #test nonzero602        self.assertEquals(2, len(r.get_state()))603        self.failIf(['service2', ['node3']] in r.get_state())604        605        # - unregister node2606        code, _, val = r.unregister('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')607        self.assertEquals(1, code)608        self.assertEquals(1, val)609        self.failIf('service1' in r) # test contains610        self.failIf(r.has_key('service1')) 611        self.assertEquals(['service1b'], [k for k in r.iterkeys()])612        self.assertEquals([], r.get_apis('service1'))613        self.assertEquals([], r['service1'])614        self.failIf(not r) #test nonzero615        self.assertEquals([['service1b', ['node2']]], r.get_state())616        code, _, val = r.unregister('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')617        self.assertEquals(1, code)618        self.assertEquals(1, val)619        self.failIf('service1' in r) # test contains620        self.failIf(r.has_key('service1')) 621        self.assertEquals([], [k for k in r.iterkeys()])622        self.assertEquals([], r.get_apis('service1'))623        self.assertEquals([], r['service1'])624        self.assert_(not r) #test nonzero625        self.assertEquals([], r.get_state())...test_util.py
Source:test_util.py  
...31        self.failUnless(self.is_tuple("((a, (b, c)),)"))32        self.failUnless(self.is_tuple("(a,)"))33        self.failUnless(self.is_tuple("()"))34    def test_invalid(self):35        self.failIf(self.is_tuple("(a)"))36        self.failIf(self.is_tuple("('foo') % (b, c)"))37class Test_is_list(support.TestCase):38    def is_list(self, string):39        return fixer_util.is_list(parse(string, strip_levels=2))40    def test_valid(self):41        self.failUnless(self.is_list("[]"))42        self.failUnless(self.is_list("[a]"))43        self.failUnless(self.is_list("[a, b]"))44        self.failUnless(self.is_list("[a, [b, c]]"))45        self.failUnless(self.is_list("[[a, [b, c]],]"))46    def test_invalid(self):47        self.failIf(self.is_list("[]+[]"))48class Test_Attr(MacroTestCase):49    def test(self):50        call = parse("foo()", strip_levels=2)51        self.assertStr(Attr(Name("a"), Name("b")), "a.b")52        self.assertStr(Attr(call, Name("b")), "foo().b")53    def test_returns(self):54        attr = Attr(Name("a"), Name("b"))55        self.assertEqual(type(attr), list)56class Test_Name(MacroTestCase):57    def test(self):58        self.assertStr(Name("a"), "a")59        self.assertStr(Name("foo.foo().bar"), "foo.foo().bar")60        self.assertStr(Name("a", prefix="b"), "ba")61class Test_does_tree_import(support.TestCase):62    def _find_bind_rec(self, name, node):63        # Search a tree for a binding -- used to find the starting64        # point for these tests.65        c = fixer_util.find_binding(name, node)66        if c: return c67        for child in node.children:68            c = self._find_bind_rec(name, child)69            if c: return c70    def does_tree_import(self, package, name, string):71        node = parse(string)72        # Find the binding of start -- that's what we'll go from73        node = self._find_bind_rec('start', node)74        return fixer_util.does_tree_import(package, name, node)75    def try_with(self, string):76        failing_tests = (("a", "a", "from a import b"),77                         ("a.d", "a", "from a.d import b"),78                         ("d.a", "a", "from d.a import b"),79                         (None, "a", "import b"),80                         (None, "a", "import b, c, d"))81        for package, name, import_ in failing_tests:82            n = self.does_tree_import(package, name, import_ + "\n" + string)83            self.failIf(n)84            n = self.does_tree_import(package, name, string + "\n" + import_)85            self.failIf(n)86        passing_tests = (("a", "a", "from a import a"),87                         ("x", "a", "from x import a"),88                         ("x", "a", "from x import b, c, a, d"),89                         ("x.b", "a", "from x.b import a"),90                         ("x.b", "a", "from x.b import b, c, a, d"),91                         (None, "a", "import a"),92                         (None, "a", "import b, c, a, d"))93        for package, name, import_ in passing_tests:94            n = self.does_tree_import(package, name, import_ + "\n" + string)95            self.failUnless(n)96            n = self.does_tree_import(package, name, string + "\n" + import_)97            self.failUnless(n)98    def test_in_function(self):99        self.try_with("def foo():\n\tbar.baz()\n\tstart=3")100class Test_find_binding(support.TestCase):101    def find_binding(self, name, string, package=None):102        return fixer_util.find_binding(name, parse(string), package)103    def test_simple_assignment(self):104        self.failUnless(self.find_binding("a", "a = b"))105        self.failUnless(self.find_binding("a", "a = [b, c, d]"))106        self.failUnless(self.find_binding("a", "a = foo()"))107        self.failUnless(self.find_binding("a", "a = foo().foo.foo[6][foo]"))108        self.failIf(self.find_binding("a", "foo = a"))109        self.failIf(self.find_binding("a", "foo = (a, b, c)"))110    def test_tuple_assignment(self):111        self.failUnless(self.find_binding("a", "(a,) = b"))112        self.failUnless(self.find_binding("a", "(a, b, c) = [b, c, d]"))113        self.failUnless(self.find_binding("a", "(c, (d, a), b) = foo()"))114        self.failUnless(self.find_binding("a", "(a, b) = foo().foo[6][foo]"))115        self.failIf(self.find_binding("a", "(foo, b) = (b, a)"))116        self.failIf(self.find_binding("a", "(foo, (b, c)) = (a, b, c)"))117    def test_list_assignment(self):118        self.failUnless(self.find_binding("a", "[a] = b"))119        self.failUnless(self.find_binding("a", "[a, b, c] = [b, c, d]"))120        self.failUnless(self.find_binding("a", "[c, [d, a], b] = foo()"))121        self.failUnless(self.find_binding("a", "[a, b] = foo().foo[a][foo]"))122        self.failIf(self.find_binding("a", "[foo, b] = (b, a)"))123        self.failIf(self.find_binding("a", "[foo, [b, c]] = (a, b, c)"))124    def test_invalid_assignments(self):125        self.failIf(self.find_binding("a", "foo.a = 5"))126        self.failIf(self.find_binding("a", "foo[a] = 5"))127        self.failIf(self.find_binding("a", "foo(a) = 5"))128        self.failIf(self.find_binding("a", "foo(a, b) = 5"))129    def test_simple_import(self):130        self.failUnless(self.find_binding("a", "import a"))131        self.failUnless(self.find_binding("a", "import b, c, a, d"))132        self.failIf(self.find_binding("a", "import b"))133        self.failIf(self.find_binding("a", "import b, c, d"))134    def test_from_import(self):135        self.failUnless(self.find_binding("a", "from x import a"))136        self.failUnless(self.find_binding("a", "from a import a"))137        self.failUnless(self.find_binding("a", "from x import b, c, a, d"))138        self.failUnless(self.find_binding("a", "from x.b import a"))139        self.failUnless(self.find_binding("a", "from x.b import b, c, a, d"))140        self.failIf(self.find_binding("a", "from a import b"))141        self.failIf(self.find_binding("a", "from a.d import b"))142        self.failIf(self.find_binding("a", "from d.a import b"))143    def test_import_as(self):144        self.failUnless(self.find_binding("a", "import b as a"))145        self.failUnless(self.find_binding("a", "import b as a, c, a as f, d"))146        self.failIf(self.find_binding("a", "import a as f"))147        self.failIf(self.find_binding("a", "import b, c as f, d as e"))148    def test_from_import_as(self):149        self.failUnless(self.find_binding("a", "from x import b as a"))150        self.failUnless(self.find_binding("a", "from x import g as a, d as b"))151        self.failUnless(self.find_binding("a", "from x.b import t as a"))152        self.failUnless(self.find_binding("a", "from x.b import g as a, d"))153        self.failIf(self.find_binding("a", "from a import b as t"))154        self.failIf(self.find_binding("a", "from a.d import b as t"))155        self.failIf(self.find_binding("a", "from d.a import b as t"))156    def test_simple_import_with_package(self):157        self.failUnless(self.find_binding("b", "import b"))158        self.failUnless(self.find_binding("b", "import b, c, d"))159        self.failIf(self.find_binding("b", "import b", "b"))160        self.failIf(self.find_binding("b", "import b, c, d", "c"))161    def test_from_import_with_package(self):162        self.failUnless(self.find_binding("a", "from x import a", "x"))163        self.failUnless(self.find_binding("a", "from a import a", "a"))164        self.failUnless(self.find_binding("a", "from x import *", "x"))165        self.failUnless(self.find_binding("a", "from x import b, c, a, d", "x"))166        self.failUnless(self.find_binding("a", "from x.b import a", "x.b"))167        self.failUnless(self.find_binding("a", "from x.b import *", "x.b"))168        self.failUnless(self.find_binding("a", "from x.b import b, c, a, d", "x.b"))169        self.failIf(self.find_binding("a", "from a import b", "a"))170        self.failIf(self.find_binding("a", "from a.d import b", "a.d"))171        self.failIf(self.find_binding("a", "from d.a import b", "a.d"))172        self.failIf(self.find_binding("a", "from x.y import *", "a.b"))173    def test_import_as_with_package(self):174        self.failIf(self.find_binding("a", "import b.c as a", "b.c"))175        self.failIf(self.find_binding("a", "import a as f", "f"))176        self.failIf(self.find_binding("a", "import a as f", "a"))177    def test_from_import_as_with_package(self):178        # Because it would take a lot of special-case code in the fixers179        # to deal with from foo import bar as baz, we'll simply always180        # fail if there is an "from ... import ... as ..."181        self.failIf(self.find_binding("a", "from x import b as a", "x"))182        self.failIf(self.find_binding("a", "from x import g as a, d as b", "x"))183        self.failIf(self.find_binding("a", "from x.b import t as a", "x.b"))184        self.failIf(self.find_binding("a", "from x.b import g as a, d", "x.b"))185        self.failIf(self.find_binding("a", "from a import b as t", "a"))186        self.failIf(self.find_binding("a", "from a import b as t", "b"))187        self.failIf(self.find_binding("a", "from a import b as t", "t"))188    def test_function_def(self):189        self.failUnless(self.find_binding("a", "def a(): pass"))190        self.failUnless(self.find_binding("a", "def a(b, c, d): pass"))191        self.failUnless(self.find_binding("a", "def a(): b = 7"))192        self.failIf(self.find_binding("a", "def d(b, (c, a), e): pass"))193        self.failIf(self.find_binding("a", "def d(a=7): pass"))194        self.failIf(self.find_binding("a", "def d(a): pass"))195        self.failIf(self.find_binding("a", "def d(): a = 7"))196        s = """197            def d():198                def a():199                    pass"""200        self.failIf(self.find_binding("a", s))201    def test_class_def(self):202        self.failUnless(self.find_binding("a", "class a: pass"))203        self.failUnless(self.find_binding("a", "class a(): pass"))204        self.failUnless(self.find_binding("a", "class a(b): pass"))205        self.failUnless(self.find_binding("a", "class a(b, c=8): pass"))206        self.failIf(self.find_binding("a", "class d: pass"))207        self.failIf(self.find_binding("a", "class d(a): pass"))208        self.failIf(self.find_binding("a", "class d(b, a=7): pass"))209        self.failIf(self.find_binding("a", "class d(b, *a): pass"))210        self.failIf(self.find_binding("a", "class d(b, **a): pass"))211        self.failIf(self.find_binding("a", "class d: a = 7"))212        s = """213            class d():214                class a():215                    pass"""216        self.failIf(self.find_binding("a", s))217    def test_for(self):218        self.failUnless(self.find_binding("a", "for a in r: pass"))219        self.failUnless(self.find_binding("a", "for a, b in r: pass"))220        self.failUnless(self.find_binding("a", "for (a, b) in r: pass"))221        self.failUnless(self.find_binding("a", "for c, (a,) in r: pass"))222        self.failUnless(self.find_binding("a", "for c, (a, b) in r: pass"))223        self.failUnless(self.find_binding("a", "for c in r: a = c"))224        self.failIf(self.find_binding("a", "for c in a: pass"))225    def test_for_nested(self):226        s = """227            for b in r:228                for a in b:229                    pass"""230        self.failUnless(self.find_binding("a", s))231        s = """232            for b in r:233                for a, c in b:234                    pass"""235        self.failUnless(self.find_binding("a", s))236        s = """237            for b in r:238                for (a, c) in b:239                    pass"""240        self.failUnless(self.find_binding("a", s))241        s = """242            for b in r:243                for (a,) in b:244                    pass"""245        self.failUnless(self.find_binding("a", s))246        s = """247            for b in r:248                for c, (a, d) in b:249                    pass"""250        self.failUnless(self.find_binding("a", s))251        s = """252            for b in r:253                for c in b:254                    a = 7"""255        self.failUnless(self.find_binding("a", s))256        s = """257            for b in r:258                for c in b:259                    d = a"""260        self.failIf(self.find_binding("a", s))261        s = """262            for b in r:263                for c in a:264                    d = 7"""265        self.failIf(self.find_binding("a", s))266    def test_if(self):267        self.failUnless(self.find_binding("a", "if b in r: a = c"))268        self.failIf(self.find_binding("a", "if a in r: d = e"))269    def test_if_nested(self):270        s = """271            if b in r:272                if c in d:273                    a = c"""274        self.failUnless(self.find_binding("a", s))275        s = """276            if b in r:277                if c in d:278                    c = a"""279        self.failIf(self.find_binding("a", s))280    def test_while(self):281        self.failUnless(self.find_binding("a", "while b in r: a = c"))282        self.failIf(self.find_binding("a", "while a in r: d = e"))283    def test_while_nested(self):284        s = """285            while b in r:286                while c in d:287                    a = c"""288        self.failUnless(self.find_binding("a", s))289        s = """290            while b in r:291                while c in d:292                    c = a"""293        self.failIf(self.find_binding("a", s))294    def test_try_except(self):295        s = """296            try:297                a = 6298            except:299                b = 8"""300        self.failUnless(self.find_binding("a", s))301        s = """302            try:303                b = 8304            except:305                a = 6"""306        self.failUnless(self.find_binding("a", s))307        s = """308            try:309                b = 8310            except KeyError:311                pass312            except:313                a = 6"""314        self.failUnless(self.find_binding("a", s))315        s = """316            try:317                b = 8318            except:319                b = 6"""320        self.failIf(self.find_binding("a", s))321    def test_try_except_nested(self):322        s = """323            try:324                try:325                    a = 6326                except:327                    pass328            except:329                b = 8"""330        self.failUnless(self.find_binding("a", s))331        s = """332            try:333                b = 8334            except:335                try:336                    a = 6337                except:338                    pass"""339        self.failUnless(self.find_binding("a", s))340        s = """341            try:342                b = 8343            except:344                try:345                    pass346                except:347                    a = 6"""348        self.failUnless(self.find_binding("a", s))349        s = """350            try:351                try:352                    b = 8353                except KeyError:354                    pass355                except:356                    a = 6357            except:358                pass"""359        self.failUnless(self.find_binding("a", s))360        s = """361            try:362                pass363            except:364                try:365                    b = 8366                except KeyError:367                    pass368                except:369                    a = 6"""370        self.failUnless(self.find_binding("a", s))371        s = """372            try:373                b = 8374            except:375                b = 6"""376        self.failIf(self.find_binding("a", s))377        s = """378            try:379                try:380                    b = 8381                except:382                    c = d383            except:384                try:385                    b = 6386                except:387                    t = 8388                except:389                    o = y"""390        self.failIf(self.find_binding("a", s))391    def test_try_except_finally(self):392        s = """393            try:394                c = 6395            except:396                b = 8397            finally:398                a = 9"""399        self.failUnless(self.find_binding("a", s))400        s = """401            try:402                b = 8403            finally:404                a = 6"""405        self.failUnless(self.find_binding("a", s))406        s = """407            try:408                b = 8409            finally:410                b = 6"""411        self.failIf(self.find_binding("a", s))412        s = """413            try:414                b = 8415            except:416                b = 9417            finally:418                b = 6"""419        self.failIf(self.find_binding("a", s))420    def test_try_except_finally_nested(self):421        s = """422            try:423                c = 6424            except:425                b = 8426            finally:427                try:428                    a = 9429                except:430                    b = 9431                finally:432                    c = 9"""433        self.failUnless(self.find_binding("a", s))434        s = """435            try:436                b = 8437            finally:438                try:439                    pass440                finally:441                    a = 6"""442        self.failUnless(self.find_binding("a", s))443        s = """444            try:445                b = 8446            finally:447                try:448                    b = 6449                finally:450                    b = 7"""451        self.failIf(self.find_binding("a", s))452class Test_touch_import(support.TestCase):453    def test_after_docstring(self):454        node = parse('"""foo"""\nbar()')455        fixer_util.touch_import(None, "foo", node)456        self.assertEqual(str(node), '"""foo"""\nimport foo\nbar()\n\n')457    def test_after_imports(self):458        node = parse('"""foo"""\nimport bar\nbar()')459        fixer_util.touch_import(None, "foo", node)460        self.assertEqual(str(node), '"""foo"""\nimport bar\nimport foo\nbar()\n\n')461    def test_beginning(self):462        node = parse('bar()')463        fixer_util.touch_import(None, "foo", node)464        self.assertEqual(str(node), 'import foo\nbar()\n\n')465    def test_from_import(self):...test_attached_object_collisions.py
Source:test_attached_object_collisions.py  
...78        #original attached object had no touch links79        80        self.state_req.robot_state.joint_state.header.stamp = rospy.Time.now()81        res = self.get_state_validity_server.call(self.state_req)82        self.failIf(res.error_code.val == res.error_code.SUCCESS)83        self.assertEqual(res.error_code.val, res.error_code.COLLISION_CONSTRAINTS_VIOLATED) 84        #should be some contacts85        self.failIf(len(res.contacts) == 0)86        87        for c in res.contacts:88            is_first = False89            90            if(c.contact_body_1 == 'r_gripper_r_finger_tip_link'):91                is_first = True92            elif(c.contact_body_2 != 'r_gripper_r_finger_tip_link'):93                self.fail("Contacts but not with correct link")94            if c.contact_body_1 not in self.touch_links:95                self.fail("Contact body 1 ", c.contact_body_1, " not in touch links");96            if c.contact_body_2 not in self.touch_links:97                self.fail("Contact body 2 ", c.contact_body_2, " not in touch links");98            self.failIf(not(c.contact_body_1 in self.touch_links))99            self.failIf(not(c.contact_body_2 in self.touch_links))100            if(is_first):101                self.failIf(c.body_type_1 != ContactInformation.ATTACHED_BODY)102                self.failIf(c.attached_body_1 != "pole")103            else:104                self.failIf(c.body_type_2 != ContactInformation.ATTACHED_BODY)105                self.failIf(c.attached_body_2 != "pole")106            107        #adding in touch links108        self.att_object.touch_links = self.touch_links109        self.att_pub.publish(self.att_object)110        rospy.sleep(2.)111        res = self.get_state_validity_server.call(self.state_req)112        #should be ok now113        self.failIf(res.error_code.val != res.error_code.SUCCESS)114    def test_attached_collisions_with_static_objects(self):115        116        self.att_object.touch_links = self.touch_links117        self.att_pub.publish(self.att_object)118        rospy.sleep(2.)119        self.obj_pub.publish(self.table)120        121        rospy.sleep(2.)122        self.state_req.robot_state.joint_state.header.stamp = rospy.Time.now()123        res = self.get_state_validity_server.call(self.state_req)124        self.failIf(res.error_code.val == res.error_code.SUCCESS)125        self.assertEqual(res.error_code.val, res.error_code.COLLISION_CONSTRAINTS_VIOLATED) 126        127        #should be some contacts128        self.failIf(len(res.contacts) == 0)129        130        for c in res.contacts:131            132            self.failIf(c.contact_body_1 != 'r_gripper_r_finger_tip_link')133            self.failIf(c.body_type_1 != ContactInformation.ATTACHED_BODY)134            self.failIf(c.attached_body_1 != "pole")135            self.failIf(c.contact_body_2 != 'tabletop')136            self.failIf(c.body_type_2 != ContactInformation.OBJECT)137if __name__ == '__main__':138    import rostest...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
