How to use failIf method in Testify

Best Python code snippet using Testify_python

test_rosmaster_registrations.py

Source:test_rosmaster_registrations.py Github

copy

Full Screen

...41 from rosmaster.registrations import NodeRef, Registrations42 n = NodeRef('n1', 'http://localhost:1234')43 # test services44 n.add(Registrations.SERVICE, 'add_two_ints')45 self.failIf(n.is_empty())46 self.assert_('add_two_ints' in n.services)47 self.assertEquals(['add_two_ints'], n.services)48 49 n.add(Registrations.SERVICE, 'add_three_ints')50 self.failIf(n.is_empty())51 self.assert_('add_three_ints' in n.services)52 self.assert_('add_two_ints' in n.services)53 n.remove(Registrations.SERVICE, 'add_two_ints')54 self.assert_('add_three_ints' in n.services)55 self.assertEquals(['add_three_ints'], n.services)56 self.failIf('add_two_ints' in n.services)57 self.failIf(n.is_empty())58 59 n.remove(Registrations.SERVICE, 'add_three_ints') 60 self.failIf('add_three_ints' in n.services)61 self.failIf('add_two_ints' in n.services)62 self.assertEquals([], n.services)63 self.assert_(n.is_empty())64 def test_NodeRef_subs(self):65 from rosmaster.registrations import NodeRef, Registrations66 n = NodeRef('n1', 'http://localhost:1234')67 # test topic suscriptions68 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')69 self.failIf(n.is_empty())70 self.assert_('topic1' in n.topic_subscriptions)71 self.assertEquals(['topic1'], n.topic_subscriptions)72 73 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2')74 self.failIf(n.is_empty())75 self.assert_('topic2' in n.topic_subscriptions)76 self.assert_('topic1' in n.topic_subscriptions)77 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic1')78 self.assert_('topic2' in n.topic_subscriptions)79 self.assertEquals(['topic2'], n.topic_subscriptions)80 self.failIf('topic1' in n.topic_subscriptions)81 self.failIf(n.is_empty())82 83 n.remove(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2') 84 self.failIf('topic2' in n.topic_subscriptions)85 self.failIf('topic1' in n.topic_subscriptions)86 self.assertEquals([], n.topic_subscriptions)87 self.assert_(n.is_empty())88 def test_NodeRef_pubs(self):89 from rosmaster.registrations import NodeRef, Registrations90 n = NodeRef('n1', 'http://localhost:1234')91 # test topic publications92 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')93 self.failIf(n.is_empty())94 self.assert_('topic1' in n.topic_publications)95 self.assertEquals(['topic1'], n.topic_publications)96 97 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')98 self.failIf(n.is_empty())99 self.assert_('topic2' in n.topic_publications)100 self.assert_('topic1' in n.topic_publications)101 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic1')102 self.assert_('topic2' in n.topic_publications)103 self.assertEquals(['topic2'], n.topic_publications)104 self.failIf('topic1' in n.topic_publications)105 self.failIf(n.is_empty())106 107 n.remove(Registrations.TOPIC_PUBLICATIONS, 'topic2') 108 self.failIf('topic2' in n.topic_publications)109 self.failIf('topic1' in n.topic_publications)110 self.assertEquals([], n.topic_publications)111 self.assert_(n.is_empty())112 def test_NodeRef_base(self):113 import rosmaster.exceptions114 from rosmaster.registrations import NodeRef, Registrations115 n = NodeRef('n1', 'http://localhost:1234')116 self.assertEquals('http://localhost:1234', n.api)117 self.assertEquals([], n.param_subscriptions)118 self.assertEquals([], n.topic_subscriptions)119 self.assertEquals([], n.topic_publications)120 self.assertEquals([], n.services)121 self.assert_(n.is_empty())122 try:123 n.add(12345, 'topic')124 self.fail("should have failed with invalid type")125 except rosmaster.exceptions.InternalException: pass126 try:127 n.remove(12345, 'topic')128 self.fail("should have failed with invalid type")129 except rosmaster.exceptions.InternalException: pass130 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic1')131 n.add(Registrations.TOPIC_PUBLICATIONS, 'topic2')132 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic2') 133 n.add(Registrations.TOPIC_SUBSCRIPTIONS, 'topic3') 134 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'topic4') 135 n.add(Registrations.SERVICE, 'serv') 136 self.failIf(n.is_empty())137 n.clear()138 self.assert_(n.is_empty()) 139 def test_NodeRef_param_subs(self):140 from rosmaster.registrations import NodeRef, Registrations141 n = NodeRef('n1', 'http://localhost:1234')142 # test param suscriptions143 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param1')144 self.failIf(n.is_empty())145 self.assert_('param1' in n.param_subscriptions)146 self.assertEquals(['param1'], n.param_subscriptions)147 148 n.add(Registrations.PARAM_SUBSCRIPTIONS, 'param2')149 self.failIf(n.is_empty())150 self.assert_('param2' in n.param_subscriptions)151 self.assert_('param1' in n.param_subscriptions)152 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param1')153 self.assert_('param2' in n.param_subscriptions)154 self.assertEquals(['param2'], n.param_subscriptions)155 self.failIf('param1' in n.param_subscriptions)156 self.failIf(n.is_empty())157 158 n.remove(Registrations.PARAM_SUBSCRIPTIONS, 'param2') 159 self.failIf('param2' in n.param_subscriptions)160 self.failIf('param1' in n.param_subscriptions)161 self.assertEquals([], n.param_subscriptions)162 self.assert_(n.is_empty())163 ## subroutine of registration tests that test topic/param type Reg objects164 ## @param r Registrations: initialized registrations object to test165 def _subtest_Registrations_basic(self, r):166 #NOTE: no real difference between topic and param names, so tests are reusable167 # - note that we've updated node1's API168 r.register('topic1', 'node1', 'http://node1:5678')169 self.assert_('topic1' in r) # test contains170 self.assert_(r.has_key('topic1')) # test contains171 self.assertEquals(['topic1'], [k for k in r.iterkeys()])172 self.assertEquals(['http://node1:5678'], r.get_apis('topic1'))173 self.assertEquals([('node1', 'http://node1:5678')], r['topic1'])174 self.failIf(not r) #test nonzero175 self.assertEquals(None, r.get_service_api('topic1')) #make sure no contamination176 self.assertEquals([['topic1', ['node1']]], r.get_state())177 r.register('topic1', 'node2', 'http://node2:5678')178 self.assertEquals(['topic1'], [k for k in r.iterkeys()]) 179 self.assertEquals(['topic1'], [k for k in r.iterkeys()])180 self.assertEquals(2, len(r.get_apis('topic1')))181 self.assert_('http://node1:5678' in r.get_apis('topic1'))182 self.assert_('http://node2:5678' in r.get_apis('topic1'))183 self.assertEquals(2, len(r['topic1']))184 self.assert_(('node1', 'http://node1:5678') in r['topic1'], r['topic1'])185 self.assert_(('node2', 'http://node2:5678') in r['topic1']) 186 self.assertEquals([['topic1', ['node1', 'node2']]], r.get_state())187 # TODO: register second topic188 r.register('topic2', 'node3', 'http://node3:5678')189 self.assert_('topic2' in r) # test contains190 self.assert_(r.has_key('topic2')) # test contains191 self.assert_('topic1' in [k for k in r.iterkeys()])192 self.assert_('topic2' in [k for k in r.iterkeys()])193 self.assertEquals(['http://node3:5678'], r.get_apis('topic2'))194 self.assertEquals([('node3', 'http://node3:5678')], r['topic2'])195 self.failIf(not r) #test nonzero196 self.assert_(['topic1', ['node1', 'node2']] in r.get_state(), r.get_state())197 self.assert_(['topic2', ['node3']] in r.get_state(), r.get_state())198 199 # Unregister200 # - fail if node is not registered201 code, _, val = r.unregister('topic1', 'node3', 'http://node3:5678')202 self.assertEquals(0, val)203 # - fail if topic is not registered by that node204 code, _, val = r.unregister('topic2', 'node2', 'http://node2:5678')205 self.assertEquals(0, val)206 # - fail if URI does not match207 code, _, val = r.unregister('topic2', 'node2', 'http://fakenode2:5678')208 self.assertEquals(0, val)209 # - unregister node2210 code, _, val = r.unregister('topic1', 'node1', 'http://node1:5678')211 self.assertEquals(1, code)212 self.assertEquals(1, val)213 self.assert_('topic1' in r) # test contains214 self.assert_(r.has_key('topic1')) 215 self.assert_('topic1' in [k for k in r.iterkeys()])216 self.assert_('topic2' in [k for k in r.iterkeys()])217 self.assertEquals(['http://node2:5678'], r.get_apis('topic1'))218 self.assertEquals([('node2', 'http://node2:5678')], r['topic1'])219 self.failIf(not r) #test nonzero220 self.assert_(['topic1', ['node2']] in r.get_state())221 self.assert_(['topic2', ['node3']] in r.get_state()) 222 code, _, val = r.unregister('topic1', 'node2', 'http://node2:5678')223 self.assertEquals(1, code)224 self.assertEquals(1, val)225 self.failIf('topic1' in r) # test contains226 self.failIf(r.has_key('topic1')) 227 self.assertEquals(['topic2'], [k for k in r.iterkeys()])228 self.assertEquals([], r.get_apis('topic1'))229 self.assertEquals([], r['topic1'])230 self.failIf(not r) #test nonzero231 self.assertEquals([['topic2', ['node3']]], r.get_state())232 # clear out last reg233 code, _, val = r.unregister('topic2', 'node3', 'http://node3:5678')234 self.assertEquals(1, code)235 self.assertEquals(1, val)236 self.failIf('topic2' in r) # test contains237 self.assert_(not r)238 self.assertEquals([], r.get_state()) 239 240 def test_Registrations(self):241 import rosmaster.exceptions242 from rosmaster.registrations import Registrations243 types = [Registrations.TOPIC_SUBSCRIPTIONS,244 Registrations.TOPIC_PUBLICATIONS,245 Registrations.SERVICE,246 Registrations.PARAM_SUBSCRIPTIONS]247 # test enums248 self.assertEquals(4, len(set(types)))249 try:250 r = Registrations(-1)251 self.fail("Registrations accepted invalid type")252 except rosmaster.exceptions.InternalException: pass253 254 for t in types:255 r = Registrations(t)256 self.assertEquals(t, r.type)257 self.assert_(not r) #test nonzero258 self.failIf('topic1' in r) #test contains 259 self.failIf(r.has_key('topic1')) #test has_key260 self.failIf([k for k in r.iterkeys()]) #no keys261 self.assertEquals(None, r.get_service_api('non-existent'))262 # Test topic subs263 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS)264 self._subtest_Registrations_basic(r)265 r = Registrations(Registrations.TOPIC_PUBLICATIONS) 266 self._subtest_Registrations_basic(r)267 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS) 268 self._subtest_Registrations_basic(r)269 r = Registrations(Registrations.SERVICE) 270 self._subtest_Registrations_services(r)271 def test_RegistrationManager_services(self):272 from rosmaster.registrations import Registrations, RegistrationManager273 rm = RegistrationManager(ThreadPoolMock())274 275 self.assertEquals(None, rm.get_node('caller1'))276 # do an unregister first, before service_api is initialized277 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://one:1234')278 self.assertEquals(1, code)279 self.assertEquals(0, val) 280 281 rm.register_service('s1', 'caller1', 'http://one:1234', 'rosrpc://one:1234')282 self.assert_(rm.services.has_key('s1'))283 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1')) 284 self.assertEquals('http://one:1234', rm.get_node('caller1').api)285 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())286 287 # - verify that changed caller_api updates ref288 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://one:1234')289 self.assert_(rm.services.has_key('s1'))290 self.assertEquals('rosrpc://one:1234', rm.services.get_service_api('s1')) 291 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)292 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())293 294 # - verify that changed service_api updates ref295 rm.register_service('s1', 'caller1', 'http://oneB:1234', 'rosrpc://oneB:1234')296 self.assert_(rm.services.has_key('s1'))297 self.assertEquals('rosrpc://oneB:1234', rm.services.get_service_api('s1')) 298 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)299 self.assertEquals([['s1', ['caller1']]], rm.services.get_state())300 301 rm.register_service('s2', 'caller2', 'http://two:1234', 'rosrpc://two:1234')302 self.assertEquals('http://two:1234', rm.get_node('caller2').api)303 # - unregister should be noop if service api does not match304 code, msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://b:1234')305 self.assertEquals(1, code)306 self.assertEquals(0, val) 307 self.assert_(rm.services.has_key('s2'))308 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 309 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))310 311 # - unregister should be noop if service is unknown312 code, msg, val = rm.unregister_service('unknown', 'caller2', 'rosrpc://two:1234')313 self.assertEquals(1, code)314 self.assertEquals(0, val) 315 self.assert_(rm.services.has_key('s2'))316 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 317 self.assertEquals('rosrpc://two:1234', rm.services.get_service_api('s2'))318 # - unregister should clear all knowledge of caller2319 code,msg, val = rm.unregister_service('s2', 'caller2', 'rosrpc://two:1234')320 self.assertEquals(1, code)321 self.assertEquals(1, val) 322 self.assert_(rm.services.has_key('s1')) 323 self.failIf(rm.services.has_key('s2')) 324 self.assertEquals(None, rm.get_node('caller2'))325 code, msg, val = rm.unregister_service('s1', 'caller1', 'rosrpc://oneB:1234')326 self.assertEquals(1, code) 327 self.assertEquals(1, val) 328 self.assert_(not rm.services.__nonzero__())329 self.failIf(rm.services.has_key('s1')) 330 self.assertEquals(None, rm.get_node('caller1')) 331 def test_RegistrationManager_topic_pub(self):332 from rosmaster.registrations import Registrations, RegistrationManager333 rm = RegistrationManager(ThreadPoolMock())334 self.subtest_RegistrationManager(rm, rm.publishers, rm.register_publisher, rm.unregister_publisher)335 336 def test_RegistrationManager_topic_sub(self):337 from rosmaster.registrations import Registrations, RegistrationManager338 rm = RegistrationManager(ThreadPoolMock())339 self.subtest_RegistrationManager(rm, rm.subscribers, rm.register_subscriber, rm.unregister_subscriber)340 def test_RegistrationManager_param_sub(self):341 from rosmaster.registrations import Registrations, RegistrationManager342 rm = RegistrationManager(ThreadPoolMock())343 self.subtest_RegistrationManager(rm, rm.param_subscribers, rm.register_param_subscriber, rm.unregister_param_subscriber)344 345 def subtest_RegistrationManager(self, rm, r, register, unregister):346 self.assertEquals(None, rm.get_node('caller1'))347 register('key1', 'caller1', 'http://one:1234')348 self.assert_(r.has_key('key1'))349 self.assertEquals('http://one:1234', rm.get_node('caller1').api)350 self.assertEquals([['key1', ['caller1']]], r.get_state())351 352 # - verify that changed caller_api updates ref353 register('key1', 'caller1', 'http://oneB:1234')354 self.assert_(r.has_key('key1'))355 self.assertEquals('http://oneB:1234', rm.get_node('caller1').api)356 self.assertEquals([['key1', ['caller1']]], r.get_state())357 358 register('key2', 'caller2', 'http://two:1234')359 self.assertEquals('http://two:1234', rm.get_node('caller2').api)360 # - unregister should be noop if caller api does not match361 code, msg, val = unregister('key2', 'caller2', 'http://b:1234')362 self.assertEquals(1, code)363 self.assertEquals(0, val) 364 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 365 366 # - unregister should be noop if key is unknown367 code, msg, val = unregister('unknown', 'caller2', 'http://two:1234')368 self.assertEquals(1, code)369 self.assertEquals(0, val) 370 self.assert_(r.has_key('key2'))371 self.assertEquals('http://two:1234', rm.get_node('caller2').api) 372 # - unregister should be noop if unknown node373 code, msg, val = rm.unregister_publisher('key2', 'unknown', 'http://unknown:1')374 self.assertEquals(1, code)375 self.assertEquals(0, val) 376 self.assert_(r.has_key('key2'))377 # - unregister should clear all knowledge of caller2378 code,msg, val = unregister('key2', 'caller2', 'http://two:1234')379 self.assertEquals(1, code)380 self.assertEquals(1, val) 381 self.assert_(r.has_key('key1')) 382 self.failIf(r.has_key('key2')) 383 self.assertEquals(None, rm.get_node('caller2'))384 code, msg, val = unregister('key1', 'caller1', 'http://oneB:1234')385 self.assertEquals(1, code) 386 self.assertEquals(1, val) 387 self.assert_(not r.__nonzero__())388 self.failIf(r.has_key('key1')) 389 self.assertEquals(None, rm.get_node('caller1')) 390 def test_RegistrationManager_base(self):391 import rosmaster.exceptions392 from rosmaster.registrations import Registrations, RegistrationManager393 threadpool = ThreadPoolMock()394 rm = RegistrationManager(threadpool)395 self.assert_(isinstance(rm.services, Registrations))396 self.assertEquals(Registrations.SERVICE, rm.services.type)397 self.assert_(isinstance(rm.param_subscribers, Registrations))398 self.assertEquals(Registrations.PARAM_SUBSCRIPTIONS, rm.param_subscribers.type)399 self.assert_(isinstance(rm.subscribers, Registrations))400 self.assertEquals(Registrations.TOPIC_SUBSCRIPTIONS, rm.subscribers.type)401 self.assert_(isinstance(rm.subscribers, Registrations))402 self.assertEquals(Registrations.TOPIC_PUBLICATIONS, rm.publishers.type)403 self.assert_(isinstance(rm.publishers, Registrations))404 #test auto-clearing of registrations if node API changes405 rm.register_publisher('pub1', 'caller1', 'http://one:1')406 rm.register_publisher('pub1', 'caller2', 'http://two:1')407 rm.register_publisher('pub1', 'caller3', 'http://three:1')408 rm.register_subscriber('sub1', 'caller1', 'http://one:1')409 rm.register_subscriber('sub1', 'caller2', 'http://two:1')410 rm.register_subscriber('sub1', 'caller3', 'http://three:1')411 rm.register_param_subscriber('p1', 'caller1', 'http://one:1')412 rm.register_param_subscriber('p1', 'caller2', 'http://two:1')413 rm.register_param_subscriber('p1', 'caller3', 'http://three:1')414 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://one:1')415 self.assertEquals('http://one:1', rm.get_node('caller1').api)416 self.assertEquals('http://two:1', rm.get_node('caller2').api)417 self.assertEquals('http://three:1', rm.get_node('caller3').api) 418 # - first, make sure that changing rosrpc URI does not erase state419 rm.register_service('s1', 'caller1', 'http://one:1', 'rosrpc://oneB:1')420 n = rm.get_node('caller1')421 self.assertEquals(['pub1'], n.topic_publications)422 self.assertEquals(['sub1'], n.topic_subscriptions)423 self.assertEquals(['p1'], n.param_subscriptions) 424 self.assertEquals(['s1'], n.services)425 self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))426 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))427 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))428 self.assert_('http://one:1' in rm.services.get_apis('s1'))429 # - also, make sure unregister does not erase state if API changed430 rm.unregister_publisher('pub1', 'caller1', 'http://not:1')431 self.assert_('http://one:1' in rm.publishers.get_apis('pub1'))432 rm.unregister_subscriber('sub1', 'caller1', 'http://not:1')433 self.assert_('http://one:1' in rm.subscribers.get_apis('sub1'))434 rm.unregister_param_subscriber('p1', 'caller1', 'http://not:1')435 self.assert_('http://one:1' in rm.param_subscribers.get_apis('p1'))436 rm.unregister_service('sub1', 'caller1', 'rosrpc://not:1')437 self.assert_('http://one:1' in rm.services.get_apis('s1'))438 439 440 # erase caller1 sub/srvs/params via register_publisher441 rm.register_publisher('pub1', 'caller1', 'http://newone:1')442 self.assertEquals('http://newone:1', rm.get_node('caller1').api) 443 # - check node ref444 n = rm.get_node('caller1')445 self.assertEquals(['pub1'], n.topic_publications)446 self.assertEquals([], n.services)447 self.assertEquals([], n.topic_subscriptions)448 self.assertEquals([], n.param_subscriptions)449 # - checks publishers450 self.assert_('http://newone:1' in rm.publishers.get_apis('pub1'))451 # - checks subscribers452 self.assert_(rm.subscribers.has_key('sub1'))453 self.failIf('http://one:1' in rm.subscribers.get_apis('sub1'))454 # - checks param subscribers455 self.assert_(rm.param_subscribers.has_key('p1'))456 self.failIf('http://one:1' in rm.param_subscribers.get_apis('p1'))457 # erase caller2 pub/sub/params via register_service458 # - initial state459 self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))460 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))461 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))462 # - change ownership of s1 to caller2463 rm.register_service('s1', 'caller2', 'http://two:1', 'rosrpc://two:1')464 self.assert_('http://two:1' in rm.services.get_apis('s1'))465 self.assert_('http://two:1' in rm.publishers.get_apis('pub1'))466 self.assert_('http://two:1' in rm.subscribers.get_apis('sub1'))467 self.assert_('http://two:1' in rm.param_subscribers.get_apis('p1'))468 469 rm.register_service('s1', 'caller2', 'http://newtwo:1', 'rosrpc://newtwo:1')470 self.assertEquals('http://newone:1', rm.get_node('caller1').api) 471 # - check node ref472 n = rm.get_node('caller2')473 self.assertEquals([], n.topic_publications)474 self.assertEquals(['s1'], n.services)475 self.assertEquals([], n.topic_subscriptions)476 self.assertEquals([], n.param_subscriptions)477 # - checks publishers478 self.assert_(rm.publishers.has_key('pub1'))479 self.failIf('http://two:1' in rm.publishers.get_apis('pub1'))480 # - checks subscribers481 self.assert_(rm.subscribers.has_key('sub1'))482 self.failIf('http://two:1' in rm.subscribers.get_apis('sub1'))483 self.assertEquals([['sub1', ['caller3']]], rm.subscribers.get_state())484 # - checks param subscribers485 self.assert_(rm.param_subscribers.has_key('p1'))486 self.failIf('http://two:1' in rm.param_subscribers.get_apis('p1'))487 self.assertEquals([['p1', ['caller3']]], rm.param_subscribers.get_state())488 489 def test_Registrations_unregister_all(self):490 import rosmaster.exceptions491 from rosmaster.registrations import Registrations492 r = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 493 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 494 r.register(k, 'node1', 'http://node1:5678')495 r.register('topic2', 'node2', 'http://node2:5678')496 r.unregister_all('node1')497 self.failIf(not r)498 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 499 self.failIf(r.has_key(k))500 self.assertEquals(['topic2'], [k for k in r.iterkeys()])501 502 r = Registrations(Registrations.TOPIC_PUBLICATIONS) 503 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 504 r.register(k, 'node1', 'http://node1:5678')505 r.register('topic2', 'node2', 'http://node2:5678')506 r.unregister_all('node1')507 self.failIf(not r)508 for k in ['topic1', 'topic1b', 'topic1c', 'topic1d']: 509 self.failIf(r.has_key(k))510 self.assertEquals(['topic2'], [k for k in r.iterkeys()])511 r = Registrations(Registrations.PARAM_SUBSCRIPTIONS) 512 r.register('param2', 'node2', 'http://node2:5678')513 for k in ['param1', 'param1b', 'param1c', 'param1d']:514 r.register(k, 'node1', 'http://node1:5678')515 r.unregister_all('node1')516 self.failIf(not r)517 for k in ['param1', 'param1b', 'param1c', 'param1d']:518 self.failIf(r.has_key(k))519 self.assertEquals(['param2'], [k for k in r.iterkeys()])520 521 r = Registrations(Registrations.SERVICE) 522 for k in ['service1', 'service1b', 'service1c', 'service1d']:523 r.register(k, 'node1', 'http://node1:5678', 'rosrpc://node1:1234')524 r.register('service2', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')525 r.unregister_all('node1')526 self.failIf(not r)527 for k in ['service1', 'service1b', 'service1c', 'service1d']:528 self.failIf(r.has_key(k))529 self.assertEquals(None, r.get_service_api(k))530 self.assertEquals(['service2'], [k for k in r.iterkeys()])531 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service2'))532 def _subtest_Registrations_services(self, r):533 import rosmaster.exceptions534 # call methods that use service_api_map, make sure they are guarded against lazy-init535 self.assertEquals(None, r.get_service_api('s1'))536 r.unregister_all('node1')537 # do an unregister first, before service_api is initialized538 code, msg, val = r.unregister('s1', 'caller1', None, 'rosrpc://one:1234')539 self.assertEquals(1, code)540 self.assertEquals(0, val) 541 try:542 r.register('service1', 'node1', 'http://node1:5678')543 self.fail("should require service_api")544 except rosmaster.exceptions.InternalException: pass545 546 r.register('service1', 'node1', 'http://node1:5678', 'rosrpc://node1:1234')547 548 self.assert_('service1' in r) # test contains549 self.assert_(r.has_key('service1')) # test contains550 self.assertEquals(['service1'], [k for k in r.iterkeys()])551 self.assertEquals(['http://node1:5678'], r.get_apis('service1'))552 self.assertEquals('rosrpc://node1:1234', r.get_service_api('service1'))553 self.assertEquals([('node1', 'http://node1:5678')], r['service1'])554 self.failIf(not r) #test nonzero555 self.assertEquals([['service1', ['node1']]], r.get_state())556 r.register('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')557 self.assertEquals(['service1'], [k for k in r.iterkeys()])558 self.assertEquals('rosrpc://node2:1234', r.get_service_api('service1'))559 self.assertEquals(['http://node2:5678'], r.get_apis('service1'))560 self.assertEquals([('node2', 'http://node2:5678')], r['service1'])561 self.assertEquals([['service1', ['node2']]], r.get_state())562 # register a second service563 r.register('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')564 self.assertEquals('rosrpc://node3:1234', r.get_service_api('service2'))565 self.assertEquals(2, len(r.get_state()))566 self.assert_(['service2', ['node3']] in r.get_state(), r.get_state())567 self.assert_(['service1', ['node2']] in r.get_state())568 569 # register a third service, second service for node2570 r.register('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')571 self.assertEquals(3, len(r.get_state()))572 self.assert_(['service2', ['node3']] in r.get_state())573 self.assert_(['service1b', ['node2']] in r.get_state())574 self.assert_(['service1', ['node2']] in r.get_state())575 576 # Unregister577 try:578 r.unregister('service1', 'node2', 'http://node2:1234')579 self.fail("service_api param must be specified")580 except rosmaster.exceptions.InternalException: pass581 582 # - fail if service is not known583 code, _, val = r.unregister('unknown', 'node2', 'http://node2:5678', 'rosprc://node2:1234')584 self.assertEquals(0, val)585 # - fail if node is not registered586 code, _, val = r.unregister('service1', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')587 self.assertEquals(0, val)588 # - fail if service API is different589 code, _, val = r.unregister('service1', 'node2', 'http://node2b:5678', 'rosrpc://node3:1234')590 self.assertEquals(0, val)591 # - unregister service2592 code, _, val = r.unregister('service2', 'node3', 'http://node3:5678', 'rosrpc://node3:1234')593 self.assertEquals(1, code)594 self.assertEquals(1, val)595 self.failIf('service2' in r) # test contains596 self.failIf(r.has_key('service2')) 597 self.assert_('service1' in [k for k in r.iterkeys()])598 self.assert_('service1b' in [k for k in r.iterkeys()])599 self.assertEquals([], r.get_apis('service2'))600 self.assertEquals([], r['service2'])601 self.failIf(not r) #test nonzero602 self.assertEquals(2, len(r.get_state()))603 self.failIf(['service2', ['node3']] in r.get_state())604 605 # - unregister node2606 code, _, val = r.unregister('service1', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')607 self.assertEquals(1, code)608 self.assertEquals(1, val)609 self.failIf('service1' in r) # test contains610 self.failIf(r.has_key('service1')) 611 self.assertEquals(['service1b'], [k for k in r.iterkeys()])612 self.assertEquals([], r.get_apis('service1'))613 self.assertEquals([], r['service1'])614 self.failIf(not r) #test nonzero615 self.assertEquals([['service1b', ['node2']]], r.get_state())616 code, _, val = r.unregister('service1b', 'node2', 'http://node2:5678', 'rosrpc://node2:1234')617 self.assertEquals(1, code)618 self.assertEquals(1, val)619 self.failIf('service1' in r) # test contains620 self.failIf(r.has_key('service1')) 621 self.assertEquals([], [k for k in r.iterkeys()])622 self.assertEquals([], r.get_apis('service1'))623 self.assertEquals([], r['service1'])624 self.assert_(not r) #test nonzero625 self.assertEquals([], r.get_state())...

Full Screen

Full Screen

test_util.py

Source:test_util.py Github

copy

Full Screen

...31 self.failUnless(self.is_tuple("((a, (b, c)),)"))32 self.failUnless(self.is_tuple("(a,)"))33 self.failUnless(self.is_tuple("()"))34 def test_invalid(self):35 self.failIf(self.is_tuple("(a)"))36 self.failIf(self.is_tuple("('foo') % (b, c)"))37class Test_is_list(support.TestCase):38 def is_list(self, string):39 return fixer_util.is_list(parse(string, strip_levels=2))40 def test_valid(self):41 self.failUnless(self.is_list("[]"))42 self.failUnless(self.is_list("[a]"))43 self.failUnless(self.is_list("[a, b]"))44 self.failUnless(self.is_list("[a, [b, c]]"))45 self.failUnless(self.is_list("[[a, [b, c]],]"))46 def test_invalid(self):47 self.failIf(self.is_list("[]+[]"))48class Test_Attr(MacroTestCase):49 def test(self):50 call = parse("foo()", strip_levels=2)51 self.assertStr(Attr(Name("a"), Name("b")), "a.b")52 self.assertStr(Attr(call, Name("b")), "foo().b")53 def test_returns(self):54 attr = Attr(Name("a"), Name("b"))55 self.assertEqual(type(attr), list)56class Test_Name(MacroTestCase):57 def test(self):58 self.assertStr(Name("a"), "a")59 self.assertStr(Name("foo.foo().bar"), "foo.foo().bar")60 self.assertStr(Name("a", prefix="b"), "ba")61class Test_does_tree_import(support.TestCase):62 def _find_bind_rec(self, name, node):63 # Search a tree for a binding -- used to find the starting64 # point for these tests.65 c = fixer_util.find_binding(name, node)66 if c: return c67 for child in node.children:68 c = self._find_bind_rec(name, child)69 if c: return c70 def does_tree_import(self, package, name, string):71 node = parse(string)72 # Find the binding of start -- that's what we'll go from73 node = self._find_bind_rec('start', node)74 return fixer_util.does_tree_import(package, name, node)75 def try_with(self, string):76 failing_tests = (("a", "a", "from a import b"),77 ("a.d", "a", "from a.d import b"),78 ("d.a", "a", "from d.a import b"),79 (None, "a", "import b"),80 (None, "a", "import b, c, d"))81 for package, name, import_ in failing_tests:82 n = self.does_tree_import(package, name, import_ + "\n" + string)83 self.failIf(n)84 n = self.does_tree_import(package, name, string + "\n" + import_)85 self.failIf(n)86 passing_tests = (("a", "a", "from a import a"),87 ("x", "a", "from x import a"),88 ("x", "a", "from x import b, c, a, d"),89 ("x.b", "a", "from x.b import a"),90 ("x.b", "a", "from x.b import b, c, a, d"),91 (None, "a", "import a"),92 (None, "a", "import b, c, a, d"))93 for package, name, import_ in passing_tests:94 n = self.does_tree_import(package, name, import_ + "\n" + string)95 self.failUnless(n)96 n = self.does_tree_import(package, name, string + "\n" + import_)97 self.failUnless(n)98 def test_in_function(self):99 self.try_with("def foo():\n\tbar.baz()\n\tstart=3")100class Test_find_binding(support.TestCase):101 def find_binding(self, name, string, package=None):102 return fixer_util.find_binding(name, parse(string), package)103 def test_simple_assignment(self):104 self.failUnless(self.find_binding("a", "a = b"))105 self.failUnless(self.find_binding("a", "a = [b, c, d]"))106 self.failUnless(self.find_binding("a", "a = foo()"))107 self.failUnless(self.find_binding("a", "a = foo().foo.foo[6][foo]"))108 self.failIf(self.find_binding("a", "foo = a"))109 self.failIf(self.find_binding("a", "foo = (a, b, c)"))110 def test_tuple_assignment(self):111 self.failUnless(self.find_binding("a", "(a,) = b"))112 self.failUnless(self.find_binding("a", "(a, b, c) = [b, c, d]"))113 self.failUnless(self.find_binding("a", "(c, (d, a), b) = foo()"))114 self.failUnless(self.find_binding("a", "(a, b) = foo().foo[6][foo]"))115 self.failIf(self.find_binding("a", "(foo, b) = (b, a)"))116 self.failIf(self.find_binding("a", "(foo, (b, c)) = (a, b, c)"))117 def test_list_assignment(self):118 self.failUnless(self.find_binding("a", "[a] = b"))119 self.failUnless(self.find_binding("a", "[a, b, c] = [b, c, d]"))120 self.failUnless(self.find_binding("a", "[c, [d, a], b] = foo()"))121 self.failUnless(self.find_binding("a", "[a, b] = foo().foo[a][foo]"))122 self.failIf(self.find_binding("a", "[foo, b] = (b, a)"))123 self.failIf(self.find_binding("a", "[foo, [b, c]] = (a, b, c)"))124 def test_invalid_assignments(self):125 self.failIf(self.find_binding("a", "foo.a = 5"))126 self.failIf(self.find_binding("a", "foo[a] = 5"))127 self.failIf(self.find_binding("a", "foo(a) = 5"))128 self.failIf(self.find_binding("a", "foo(a, b) = 5"))129 def test_simple_import(self):130 self.failUnless(self.find_binding("a", "import a"))131 self.failUnless(self.find_binding("a", "import b, c, a, d"))132 self.failIf(self.find_binding("a", "import b"))133 self.failIf(self.find_binding("a", "import b, c, d"))134 def test_from_import(self):135 self.failUnless(self.find_binding("a", "from x import a"))136 self.failUnless(self.find_binding("a", "from a import a"))137 self.failUnless(self.find_binding("a", "from x import b, c, a, d"))138 self.failUnless(self.find_binding("a", "from x.b import a"))139 self.failUnless(self.find_binding("a", "from x.b import b, c, a, d"))140 self.failIf(self.find_binding("a", "from a import b"))141 self.failIf(self.find_binding("a", "from a.d import b"))142 self.failIf(self.find_binding("a", "from d.a import b"))143 def test_import_as(self):144 self.failUnless(self.find_binding("a", "import b as a"))145 self.failUnless(self.find_binding("a", "import b as a, c, a as f, d"))146 self.failIf(self.find_binding("a", "import a as f"))147 self.failIf(self.find_binding("a", "import b, c as f, d as e"))148 def test_from_import_as(self):149 self.failUnless(self.find_binding("a", "from x import b as a"))150 self.failUnless(self.find_binding("a", "from x import g as a, d as b"))151 self.failUnless(self.find_binding("a", "from x.b import t as a"))152 self.failUnless(self.find_binding("a", "from x.b import g as a, d"))153 self.failIf(self.find_binding("a", "from a import b as t"))154 self.failIf(self.find_binding("a", "from a.d import b as t"))155 self.failIf(self.find_binding("a", "from d.a import b as t"))156 def test_simple_import_with_package(self):157 self.failUnless(self.find_binding("b", "import b"))158 self.failUnless(self.find_binding("b", "import b, c, d"))159 self.failIf(self.find_binding("b", "import b", "b"))160 self.failIf(self.find_binding("b", "import b, c, d", "c"))161 def test_from_import_with_package(self):162 self.failUnless(self.find_binding("a", "from x import a", "x"))163 self.failUnless(self.find_binding("a", "from a import a", "a"))164 self.failUnless(self.find_binding("a", "from x import *", "x"))165 self.failUnless(self.find_binding("a", "from x import b, c, a, d", "x"))166 self.failUnless(self.find_binding("a", "from x.b import a", "x.b"))167 self.failUnless(self.find_binding("a", "from x.b import *", "x.b"))168 self.failUnless(self.find_binding("a", "from x.b import b, c, a, d", "x.b"))169 self.failIf(self.find_binding("a", "from a import b", "a"))170 self.failIf(self.find_binding("a", "from a.d import b", "a.d"))171 self.failIf(self.find_binding("a", "from d.a import b", "a.d"))172 self.failIf(self.find_binding("a", "from x.y import *", "a.b"))173 def test_import_as_with_package(self):174 self.failIf(self.find_binding("a", "import b.c as a", "b.c"))175 self.failIf(self.find_binding("a", "import a as f", "f"))176 self.failIf(self.find_binding("a", "import a as f", "a"))177 def test_from_import_as_with_package(self):178 # Because it would take a lot of special-case code in the fixers179 # to deal with from foo import bar as baz, we'll simply always180 # fail if there is an "from ... import ... as ..."181 self.failIf(self.find_binding("a", "from x import b as a", "x"))182 self.failIf(self.find_binding("a", "from x import g as a, d as b", "x"))183 self.failIf(self.find_binding("a", "from x.b import t as a", "x.b"))184 self.failIf(self.find_binding("a", "from x.b import g as a, d", "x.b"))185 self.failIf(self.find_binding("a", "from a import b as t", "a"))186 self.failIf(self.find_binding("a", "from a import b as t", "b"))187 self.failIf(self.find_binding("a", "from a import b as t", "t"))188 def test_function_def(self):189 self.failUnless(self.find_binding("a", "def a(): pass"))190 self.failUnless(self.find_binding("a", "def a(b, c, d): pass"))191 self.failUnless(self.find_binding("a", "def a(): b = 7"))192 self.failIf(self.find_binding("a", "def d(b, (c, a), e): pass"))193 self.failIf(self.find_binding("a", "def d(a=7): pass"))194 self.failIf(self.find_binding("a", "def d(a): pass"))195 self.failIf(self.find_binding("a", "def d(): a = 7"))196 s = """197 def d():198 def a():199 pass"""200 self.failIf(self.find_binding("a", s))201 def test_class_def(self):202 self.failUnless(self.find_binding("a", "class a: pass"))203 self.failUnless(self.find_binding("a", "class a(): pass"))204 self.failUnless(self.find_binding("a", "class a(b): pass"))205 self.failUnless(self.find_binding("a", "class a(b, c=8): pass"))206 self.failIf(self.find_binding("a", "class d: pass"))207 self.failIf(self.find_binding("a", "class d(a): pass"))208 self.failIf(self.find_binding("a", "class d(b, a=7): pass"))209 self.failIf(self.find_binding("a", "class d(b, *a): pass"))210 self.failIf(self.find_binding("a", "class d(b, **a): pass"))211 self.failIf(self.find_binding("a", "class d: a = 7"))212 s = """213 class d():214 class a():215 pass"""216 self.failIf(self.find_binding("a", s))217 def test_for(self):218 self.failUnless(self.find_binding("a", "for a in r: pass"))219 self.failUnless(self.find_binding("a", "for a, b in r: pass"))220 self.failUnless(self.find_binding("a", "for (a, b) in r: pass"))221 self.failUnless(self.find_binding("a", "for c, (a,) in r: pass"))222 self.failUnless(self.find_binding("a", "for c, (a, b) in r: pass"))223 self.failUnless(self.find_binding("a", "for c in r: a = c"))224 self.failIf(self.find_binding("a", "for c in a: pass"))225 def test_for_nested(self):226 s = """227 for b in r:228 for a in b:229 pass"""230 self.failUnless(self.find_binding("a", s))231 s = """232 for b in r:233 for a, c in b:234 pass"""235 self.failUnless(self.find_binding("a", s))236 s = """237 for b in r:238 for (a, c) in b:239 pass"""240 self.failUnless(self.find_binding("a", s))241 s = """242 for b in r:243 for (a,) in b:244 pass"""245 self.failUnless(self.find_binding("a", s))246 s = """247 for b in r:248 for c, (a, d) in b:249 pass"""250 self.failUnless(self.find_binding("a", s))251 s = """252 for b in r:253 for c in b:254 a = 7"""255 self.failUnless(self.find_binding("a", s))256 s = """257 for b in r:258 for c in b:259 d = a"""260 self.failIf(self.find_binding("a", s))261 s = """262 for b in r:263 for c in a:264 d = 7"""265 self.failIf(self.find_binding("a", s))266 def test_if(self):267 self.failUnless(self.find_binding("a", "if b in r: a = c"))268 self.failIf(self.find_binding("a", "if a in r: d = e"))269 def test_if_nested(self):270 s = """271 if b in r:272 if c in d:273 a = c"""274 self.failUnless(self.find_binding("a", s))275 s = """276 if b in r:277 if c in d:278 c = a"""279 self.failIf(self.find_binding("a", s))280 def test_while(self):281 self.failUnless(self.find_binding("a", "while b in r: a = c"))282 self.failIf(self.find_binding("a", "while a in r: d = e"))283 def test_while_nested(self):284 s = """285 while b in r:286 while c in d:287 a = c"""288 self.failUnless(self.find_binding("a", s))289 s = """290 while b in r:291 while c in d:292 c = a"""293 self.failIf(self.find_binding("a", s))294 def test_try_except(self):295 s = """296 try:297 a = 6298 except:299 b = 8"""300 self.failUnless(self.find_binding("a", s))301 s = """302 try:303 b = 8304 except:305 a = 6"""306 self.failUnless(self.find_binding("a", s))307 s = """308 try:309 b = 8310 except KeyError:311 pass312 except:313 a = 6"""314 self.failUnless(self.find_binding("a", s))315 s = """316 try:317 b = 8318 except:319 b = 6"""320 self.failIf(self.find_binding("a", s))321 def test_try_except_nested(self):322 s = """323 try:324 try:325 a = 6326 except:327 pass328 except:329 b = 8"""330 self.failUnless(self.find_binding("a", s))331 s = """332 try:333 b = 8334 except:335 try:336 a = 6337 except:338 pass"""339 self.failUnless(self.find_binding("a", s))340 s = """341 try:342 b = 8343 except:344 try:345 pass346 except:347 a = 6"""348 self.failUnless(self.find_binding("a", s))349 s = """350 try:351 try:352 b = 8353 except KeyError:354 pass355 except:356 a = 6357 except:358 pass"""359 self.failUnless(self.find_binding("a", s))360 s = """361 try:362 pass363 except:364 try:365 b = 8366 except KeyError:367 pass368 except:369 a = 6"""370 self.failUnless(self.find_binding("a", s))371 s = """372 try:373 b = 8374 except:375 b = 6"""376 self.failIf(self.find_binding("a", s))377 s = """378 try:379 try:380 b = 8381 except:382 c = d383 except:384 try:385 b = 6386 except:387 t = 8388 except:389 o = y"""390 self.failIf(self.find_binding("a", s))391 def test_try_except_finally(self):392 s = """393 try:394 c = 6395 except:396 b = 8397 finally:398 a = 9"""399 self.failUnless(self.find_binding("a", s))400 s = """401 try:402 b = 8403 finally:404 a = 6"""405 self.failUnless(self.find_binding("a", s))406 s = """407 try:408 b = 8409 finally:410 b = 6"""411 self.failIf(self.find_binding("a", s))412 s = """413 try:414 b = 8415 except:416 b = 9417 finally:418 b = 6"""419 self.failIf(self.find_binding("a", s))420 def test_try_except_finally_nested(self):421 s = """422 try:423 c = 6424 except:425 b = 8426 finally:427 try:428 a = 9429 except:430 b = 9431 finally:432 c = 9"""433 self.failUnless(self.find_binding("a", s))434 s = """435 try:436 b = 8437 finally:438 try:439 pass440 finally:441 a = 6"""442 self.failUnless(self.find_binding("a", s))443 s = """444 try:445 b = 8446 finally:447 try:448 b = 6449 finally:450 b = 7"""451 self.failIf(self.find_binding("a", s))452class Test_touch_import(support.TestCase):453 def test_after_docstring(self):454 node = parse('"""foo"""\nbar()')455 fixer_util.touch_import(None, "foo", node)456 self.assertEqual(str(node), '"""foo"""\nimport foo\nbar()\n\n')457 def test_after_imports(self):458 node = parse('"""foo"""\nimport bar\nbar()')459 fixer_util.touch_import(None, "foo", node)460 self.assertEqual(str(node), '"""foo"""\nimport bar\nimport foo\nbar()\n\n')461 def test_beginning(self):462 node = parse('bar()')463 fixer_util.touch_import(None, "foo", node)464 self.assertEqual(str(node), 'import foo\nbar()\n\n')465 def test_from_import(self):...

Full Screen

Full Screen

test_attached_object_collisions.py

Source:test_attached_object_collisions.py Github

copy

Full Screen

...78 #original attached object had no touch links79 80 self.state_req.robot_state.joint_state.header.stamp = rospy.Time.now()81 res = self.get_state_validity_server.call(self.state_req)82 self.failIf(res.error_code.val == res.error_code.SUCCESS)83 self.assertEqual(res.error_code.val, res.error_code.COLLISION_CONSTRAINTS_VIOLATED) 84 #should be some contacts85 self.failIf(len(res.contacts) == 0)86 87 for c in res.contacts:88 is_first = False89 90 if(c.contact_body_1 == 'r_gripper_r_finger_tip_link'):91 is_first = True92 elif(c.contact_body_2 != 'r_gripper_r_finger_tip_link'):93 self.fail("Contacts but not with correct link")94 if c.contact_body_1 not in self.touch_links:95 self.fail("Contact body 1 ", c.contact_body_1, " not in touch links");96 if c.contact_body_2 not in self.touch_links:97 self.fail("Contact body 2 ", c.contact_body_2, " not in touch links");98 self.failIf(not(c.contact_body_1 in self.touch_links))99 self.failIf(not(c.contact_body_2 in self.touch_links))100 if(is_first):101 self.failIf(c.body_type_1 != ContactInformation.ATTACHED_BODY)102 self.failIf(c.attached_body_1 != "pole")103 else:104 self.failIf(c.body_type_2 != ContactInformation.ATTACHED_BODY)105 self.failIf(c.attached_body_2 != "pole")106 107 #adding in touch links108 self.att_object.touch_links = self.touch_links109 self.att_pub.publish(self.att_object)110 rospy.sleep(2.)111 res = self.get_state_validity_server.call(self.state_req)112 #should be ok now113 self.failIf(res.error_code.val != res.error_code.SUCCESS)114 def test_attached_collisions_with_static_objects(self):115 116 self.att_object.touch_links = self.touch_links117 self.att_pub.publish(self.att_object)118 rospy.sleep(2.)119 self.obj_pub.publish(self.table)120 121 rospy.sleep(2.)122 self.state_req.robot_state.joint_state.header.stamp = rospy.Time.now()123 res = self.get_state_validity_server.call(self.state_req)124 self.failIf(res.error_code.val == res.error_code.SUCCESS)125 self.assertEqual(res.error_code.val, res.error_code.COLLISION_CONSTRAINTS_VIOLATED) 126 127 #should be some contacts128 self.failIf(len(res.contacts) == 0)129 130 for c in res.contacts:131 132 self.failIf(c.contact_body_1 != 'r_gripper_r_finger_tip_link')133 self.failIf(c.body_type_1 != ContactInformation.ATTACHED_BODY)134 self.failIf(c.attached_body_1 != "pole")135 self.failIf(c.contact_body_2 != 'tabletop')136 self.failIf(c.body_type_2 != ContactInformation.OBJECT)137if __name__ == '__main__':138 import rostest...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful