Best JavaScript code snippet using devicefarmer-stf
testhelpers.py
Source:testhelpers.py
...44 mock(d, foo=d, bar=d)45 mock.method(d, zinga=d, alpha=d)46 mock().method(a1=d, z99=d)47 expected = [48 call(ANY, foo=ANY, bar=ANY),49 call.method(ANY, zinga=ANY, alpha=ANY),50 call(), call().method(a1=ANY, z99=ANY)51 ]52 self.assertEqual(expected, mock.mock_calls)53 self.assertEqual(mock.mock_calls, expected)54class CallTest(unittest.TestCase):55 def test_call_with_call(self):56 kall = _Call()57 self.assertEqual(kall, _Call())58 self.assertEqual(kall, _Call(('',)))59 self.assertEqual(kall, _Call(((),)))60 self.assertEqual(kall, _Call(({},)))61 self.assertEqual(kall, _Call(('', ())))62 self.assertEqual(kall, _Call(('', {})))63 self.assertEqual(kall, _Call(('', (), {})))64 self.assertEqual(kall, _Call(('foo',)))65 self.assertEqual(kall, _Call(('bar', ())))66 self.assertEqual(kall, _Call(('baz', {})))67 self.assertEqual(kall, _Call(('spam', (), {})))68 kall = _Call(((1, 2, 3),))69 self.assertEqual(kall, _Call(((1, 2, 3),)))70 self.assertEqual(kall, _Call(('', (1, 2, 3))))71 self.assertEqual(kall, _Call(((1, 2, 3), {})))72 self.assertEqual(kall, _Call(('', (1, 2, 3), {})))73 kall = _Call(((1, 2, 4),))74 self.assertNotEqual(kall, _Call(('', (1, 2, 3))))75 self.assertNotEqual(kall, _Call(('', (1, 2, 3), {})))76 kall = _Call(('foo', (1, 2, 4),))77 self.assertNotEqual(kall, _Call(('', (1, 2, 4))))78 self.assertNotEqual(kall, _Call(('', (1, 2, 4), {})))79 self.assertNotEqual(kall, _Call(('bar', (1, 2, 4))))80 self.assertNotEqual(kall, _Call(('bar', (1, 2, 4), {})))81 kall = _Call(({'a': 3},))82 self.assertEqual(kall, _Call(('', (), {'a': 3})))83 self.assertEqual(kall, _Call(('', {'a': 3})))84 self.assertEqual(kall, _Call(((), {'a': 3})))85 self.assertEqual(kall, _Call(({'a': 3},)))86 def test_empty__Call(self):87 args = _Call()88 self.assertEqual(args, ())89 self.assertEqual(args, ('foo',))90 self.assertEqual(args, ((),))91 self.assertEqual(args, ('foo', ()))92 self.assertEqual(args, ('foo',(), {}))93 self.assertEqual(args, ('foo', {}))94 self.assertEqual(args, ({},))95 def test_named_empty_call(self):96 args = _Call(('foo', (), {}))97 self.assertEqual(args, ('foo',))98 self.assertEqual(args, ('foo', ()))99 self.assertEqual(args, ('foo',(), {}))100 self.assertEqual(args, ('foo', {}))101 self.assertNotEqual(args, ((),))102 self.assertNotEqual(args, ())103 self.assertNotEqual(args, ({},))104 self.assertNotEqual(args, ('bar',))105 self.assertNotEqual(args, ('bar', ()))106 self.assertNotEqual(args, ('bar', {}))107 def test_call_with_args(self):108 args = _Call(((1, 2, 3), {}))109 self.assertEqual(args, ((1, 2, 3),))110 self.assertEqual(args, ('foo', (1, 2, 3)))111 self.assertEqual(args, ('foo', (1, 2, 3), {}))112 self.assertEqual(args, ((1, 2, 3), {}))113 def test_named_call_with_args(self):114 args = _Call(('foo', (1, 2, 3), {}))115 self.assertEqual(args, ('foo', (1, 2, 3)))116 self.assertEqual(args, ('foo', (1, 2, 3), {}))117 self.assertNotEqual(args, ((1, 2, 3),))118 self.assertNotEqual(args, ((1, 2, 3), {}))119 def test_call_with_kwargs(self):120 args = _Call(((), dict(a=3, b=4)))121 self.assertEqual(args, (dict(a=3, b=4),))122 self.assertEqual(args, ('foo', dict(a=3, b=4)))123 self.assertEqual(args, ('foo', (), dict(a=3, b=4)))124 self.assertEqual(args, ((), dict(a=3, b=4)))125 def test_named_call_with_kwargs(self):126 args = _Call(('foo', (), dict(a=3, b=4)))127 self.assertEqual(args, ('foo', dict(a=3, b=4)))128 self.assertEqual(args, ('foo', (), dict(a=3, b=4)))129 self.assertNotEqual(args, (dict(a=3, b=4),))130 self.assertNotEqual(args, ((), dict(a=3, b=4)))131 def test_call_with_args_call_empty_name(self):132 args = _Call(((1, 2, 3), {}))133 self.assertEqual(args, call(1, 2, 3))134 self.assertEqual(call(1, 2, 3), args)135 self.assertIn(call(1, 2, 3), [args])136 def test_call_ne(self):137 self.assertNotEqual(_Call(((1, 2, 3),)), call(1, 2))138 self.assertFalse(_Call(((1, 2, 3),)) != call(1, 2, 3))139 self.assertTrue(_Call(((1, 2), {})) != call(1, 2, 3))140 def test_call_non_tuples(self):141 kall = _Call(((1, 2, 3),))142 for value in 1, None, self, int:143 self.assertNotEqual(kall, value)144 self.assertFalse(kall == value)145 def test_repr(self):146 self.assertEqual(repr(_Call()), 'call()')147 self.assertEqual(repr(_Call(('foo',))), 'call.foo()')148 self.assertEqual(repr(_Call(((1, 2, 3), {'a': 'b'}))),149 "call(1, 2, 3, a='b')")150 self.assertEqual(repr(_Call(('bar', (1, 2, 3), {'a': 'b'}))),151 "call.bar(1, 2, 3, a='b')")152 self.assertEqual(repr(call), 'call')153 self.assertEqual(str(call), 'call')154 self.assertEqual(repr(call()), 'call()')155 self.assertEqual(repr(call(1)), 'call(1)')156 self.assertEqual(repr(call(zz='thing')), "call(zz='thing')")157 self.assertEqual(repr(call().foo), 'call().foo')158 self.assertEqual(repr(call(1).foo.bar(a=3).bing),159 'call().foo.bar().bing')160 self.assertEqual(161 repr(call().foo(1, 2, a=3)),162 "call().foo(1, 2, a=3)"163 )164 self.assertEqual(repr(call()()), "call()()")165 self.assertEqual(repr(call(1)(2)), "call()(2)")166 self.assertEqual(167 repr(call()().bar().baz.beep(1)),168 "call()().bar().baz.beep(1)"169 )170 def test_call(self):171 self.assertEqual(call(), ('', (), {}))172 self.assertEqual(call('foo', 'bar', one=3, two=4),173 ('', ('foo', 'bar'), {'one': 3, 'two': 4}))174 mock = Mock()175 mock(1, 2, 3)176 mock(a=3, b=6)177 self.assertEqual(mock.call_args_list,178 [call(1, 2, 3), call(a=3, b=6)])179 def test_attribute_call(self):180 self.assertEqual(call.foo(1), ('foo', (1,), {}))181 self.assertEqual(call.bar.baz(fish='eggs'),182 ('bar.baz', (), {'fish': 'eggs'}))183 mock = Mock()184 mock.foo(1, 2 ,3)185 mock.bar.baz(a=3, b=6)186 self.assertEqual(mock.method_calls,187 [call.foo(1, 2, 3), call.bar.baz(a=3, b=6)])188 def test_extended_call(self):189 result = call(1).foo(2).bar(3, a=4)190 self.assertEqual(result, ('().foo().bar', (3,), dict(a=4)))191 mock = MagicMock()192 mock(1, 2, a=3, b=4)193 self.assertEqual(mock.call_args, call(1, 2, a=3, b=4))194 self.assertNotEqual(mock.call_args, call(1, 2, 3))195 self.assertEqual(mock.call_args_list, [call(1, 2, a=3, b=4)])196 self.assertEqual(mock.mock_calls, [call(1, 2, a=3, b=4)])197 mock = MagicMock()198 mock.foo(1).bar()().baz.beep(a=6)199 last_call = call.foo(1).bar()().baz.beep(a=6)200 self.assertEqual(mock.mock_calls[-1], last_call)201 self.assertEqual(mock.mock_calls, last_call.call_list())202 def test_extended_not_equal(self):203 a = call(x=1).foo204 b = call(x=2).foo205 self.assertEqual(a, a)206 self.assertEqual(b, b)207 self.assertNotEqual(a, b)208 def test_nested_calls_not_equal(self):209 a = call(x=1).foo().bar210 b = call(x=2).foo().bar211 self.assertEqual(a, a)212 self.assertEqual(b, b)213 self.assertNotEqual(a, b)214 def test_call_list(self):215 mock = MagicMock()216 mock(1)217 self.assertEqual(call(1).call_list(), mock.mock_calls)218 mock = MagicMock()219 mock(1).method(2)220 self.assertEqual(call(1).method(2).call_list(),221 mock.mock_calls)222 mock = MagicMock()223 mock(1).method(2)(3)224 self.assertEqual(call(1).method(2)(3).call_list(),225 mock.mock_calls)226 mock = MagicMock()227 int(mock(1).method(2)(3).foo.bar.baz(4)(5))228 kall = call(1).method(2)(3).foo.bar.baz(4)(5).__int__()229 self.assertEqual(kall.call_list(), mock.mock_calls)230 def test_call_any(self):231 self.assertEqual(call, ANY)232 m = MagicMock()233 int(m)234 self.assertEqual(m.mock_calls, [ANY])235 self.assertEqual([ANY], m.mock_calls)236 def test_two_args_call(self):237 args = _Call(((1, 2), {'a': 3}), two=True)238 self.assertEqual(len(args), 2)239 self.assertEqual(args[0], (1, 2))240 self.assertEqual(args[1], {'a': 3})241 other_args = _Call(((1, 2), {'a': 3}))242 self.assertEqual(args, other_args)243 def test_call_with_name(self):244 self.assertEqual(_Call((), 'foo')[0], 'foo')245 self.assertEqual(_Call((('bar', 'barz'),),)[0], '')246 self.assertEqual(_Call((('bar', 'barz'), {'hello': 'world'}),)[0], '')247class SpecSignatureTest(unittest.TestCase):248 def _check_someclass_mock(self, mock):249 self.assertRaises(AttributeError, getattr, mock, 'foo')250 mock.one(1, 2)251 mock.one.assert_called_with(1, 2)252 self.assertRaises(AssertionError,253 mock.one.assert_called_with, 3, 4)254 self.assertRaises(TypeError, mock.one, 1)255 mock.two()256 mock.two.assert_called_with()257 self.assertRaises(AssertionError,258 mock.two.assert_called_with, 3)259 self.assertRaises(TypeError, mock.two, 1)260 mock.three()261 mock.three.assert_called_with()262 self.assertRaises(AssertionError,263 mock.three.assert_called_with, 3)264 self.assertRaises(TypeError, mock.three, 3, 2)265 mock.three(1)266 mock.three.assert_called_with(1)267 mock.three(a=1)268 mock.three.assert_called_with(a=1)269 def test_basic(self):270 mock = create_autospec(SomeClass)271 self._check_someclass_mock(mock)272 mock = create_autospec(SomeClass())273 self._check_someclass_mock(mock)274 def test_create_autospec_return_value(self):275 def f():276 pass277 mock = create_autospec(f, return_value='foo')278 self.assertEqual(mock(), 'foo')279 class Foo(object):280 pass281 mock = create_autospec(Foo, return_value='foo')282 self.assertEqual(mock(), 'foo')283 def test_autospec_reset_mock(self):284 m = create_autospec(int)285 int(m)286 m.reset_mock()287 self.assertEqual(m.__int__.call_count, 0)288 def test_mocking_unbound_methods(self):289 class Foo(object):290 def foo(self, foo):291 pass292 p = patch.object(Foo, 'foo')293 mock_foo = p.start()294 Foo().foo(1)295 mock_foo.assert_called_with(1)296 def test_create_autospec_unbound_methods(self):297 # see mock issue 128298 # this is expected to fail until the issue is fixed299 return300 class Foo(object):301 def foo(self):302 pass303 klass = create_autospec(Foo)304 instance = klass()305 self.assertRaises(TypeError, instance.foo, 1)306 # Note: no type checking on the "self" parameter307 klass.foo(1)308 klass.foo.assert_called_with(1)309 self.assertRaises(TypeError, klass.foo)310 def test_create_autospec_keyword_arguments(self):311 class Foo(object):312 a = 3313 m = create_autospec(Foo, a='3')314 self.assertEqual(m.a, '3')315 def test_create_autospec_keyword_only_arguments(self):316 def foo(a, *, b=None):317 pass318 m = create_autospec(foo)319 m(1)320 m.assert_called_with(1)321 self.assertRaises(TypeError, m, 1, 2)322 m(2, b=3)323 m.assert_called_with(2, b=3)324 def test_function_as_instance_attribute(self):325 obj = SomeClass()326 def f(a):327 pass328 obj.f = f329 mock = create_autospec(obj)330 mock.f('bing')331 mock.f.assert_called_with('bing')332 def test_spec_as_list(self):333 # because spec as a list of strings in the mock constructor means334 # something very different we treat a list instance as the type.335 mock = create_autospec([])336 mock.append('foo')337 mock.append.assert_called_with('foo')338 self.assertRaises(AttributeError, getattr, mock, 'foo')339 class Foo(object):340 foo = []341 mock = create_autospec(Foo)342 mock.foo.append(3)343 mock.foo.append.assert_called_with(3)344 self.assertRaises(AttributeError, getattr, mock.foo, 'foo')345 def test_attributes(self):346 class Sub(SomeClass):347 attr = SomeClass()348 sub_mock = create_autospec(Sub)349 for mock in (sub_mock, sub_mock.attr):350 self._check_someclass_mock(mock)351 def test_builtin_functions_types(self):352 # we could replace builtin functions / methods with a function353 # with *args / **kwargs signature. Using the builtin method type354 # as a spec seems to work fairly well though.355 class BuiltinSubclass(list):356 def bar(self, arg):357 pass358 sorted = sorted359 attr = {}360 mock = create_autospec(BuiltinSubclass)361 mock.append(3)362 mock.append.assert_called_with(3)363 self.assertRaises(AttributeError, getattr, mock.append, 'foo')364 mock.bar('foo')365 mock.bar.assert_called_with('foo')366 self.assertRaises(TypeError, mock.bar, 'foo', 'bar')367 self.assertRaises(AttributeError, getattr, mock.bar, 'foo')368 mock.sorted([1, 2])369 mock.sorted.assert_called_with([1, 2])370 self.assertRaises(AttributeError, getattr, mock.sorted, 'foo')371 mock.attr.pop(3)372 mock.attr.pop.assert_called_with(3)373 self.assertRaises(AttributeError, getattr, mock.attr, 'foo')374 def test_method_calls(self):375 class Sub(SomeClass):376 attr = SomeClass()377 mock = create_autospec(Sub)378 mock.one(1, 2)379 mock.two()380 mock.three(3)381 expected = [call.one(1, 2), call.two(), call.three(3)]382 self.assertEqual(mock.method_calls, expected)383 mock.attr.one(1, 2)384 mock.attr.two()385 mock.attr.three(3)386 expected.extend(387 [call.attr.one(1, 2), call.attr.two(), call.attr.three(3)]388 )389 self.assertEqual(mock.method_calls, expected)390 def test_magic_methods(self):391 class BuiltinSubclass(list):392 attr = {}393 mock = create_autospec(BuiltinSubclass)394 self.assertEqual(list(mock), [])395 self.assertRaises(TypeError, int, mock)396 self.assertRaises(TypeError, int, mock.attr)397 self.assertEqual(list(mock), [])398 self.assertIsInstance(mock['foo'], MagicMock)399 self.assertIsInstance(mock.attr['foo'], MagicMock)400 def test_spec_set(self):401 class Sub(SomeClass):402 attr = SomeClass()403 for spec in (Sub, Sub()):404 mock = create_autospec(spec, spec_set=True)405 self._check_someclass_mock(mock)406 self.assertRaises(AttributeError, setattr, mock, 'foo', 'bar')407 self.assertRaises(AttributeError, setattr, mock.attr, 'foo', 'bar')408 def test_descriptors(self):409 class Foo(object):410 @classmethod411 def f(cls, a, b):412 pass413 @staticmethod414 def g(a, b):415 pass416 class Bar(Foo):417 pass418 class Baz(SomeClass, Bar):419 pass420 for spec in (Foo, Foo(), Bar, Bar(), Baz, Baz()):421 mock = create_autospec(spec)422 mock.f(1, 2)423 mock.f.assert_called_once_with(1, 2)424 mock.g(3, 4)425 mock.g.assert_called_once_with(3, 4)426 def test_recursive(self):427 class A(object):428 def a(self):429 pass430 foo = 'foo bar baz'431 bar = foo432 A.B = A433 mock = create_autospec(A)434 mock()435 self.assertFalse(mock.B.called)436 mock.a()437 mock.B.a()438 self.assertEqual(mock.method_calls, [call.a(), call.B.a()])439 self.assertIs(A.foo, A.bar)440 self.assertIsNot(mock.foo, mock.bar)441 mock.foo.lower()442 self.assertRaises(AssertionError, mock.bar.lower.assert_called_with)443 def test_spec_inheritance_for_classes(self):444 class Foo(object):445 def a(self, x):446 pass447 class Bar(object):448 def f(self, y):449 pass450 class_mock = create_autospec(Foo)451 self.assertIsNot(class_mock, class_mock())452 for this_mock in class_mock, class_mock():453 this_mock.a(x=5)454 this_mock.a.assert_called_with(x=5)455 this_mock.a.assert_called_with(5)456 self.assertRaises(TypeError, this_mock.a, 'foo', 'bar')457 self.assertRaises(AttributeError, getattr, this_mock, 'b')458 instance_mock = create_autospec(Foo())459 instance_mock.a(5)460 instance_mock.a.assert_called_with(5)461 instance_mock.a.assert_called_with(x=5)462 self.assertRaises(TypeError, instance_mock.a, 'foo', 'bar')463 self.assertRaises(AttributeError, getattr, instance_mock, 'b')464 # The return value isn't isn't callable465 self.assertRaises(TypeError, instance_mock)466 instance_mock.Bar.f(6)467 instance_mock.Bar.f.assert_called_with(6)468 instance_mock.Bar.f.assert_called_with(y=6)469 self.assertRaises(AttributeError, getattr, instance_mock.Bar, 'g')470 instance_mock.Bar().f(6)471 instance_mock.Bar().f.assert_called_with(6)472 instance_mock.Bar().f.assert_called_with(y=6)473 self.assertRaises(AttributeError, getattr, instance_mock.Bar(), 'g')474 def test_inherit(self):475 class Foo(object):476 a = 3477 Foo.Foo = Foo478 # class479 mock = create_autospec(Foo)480 instance = mock()481 self.assertRaises(AttributeError, getattr, instance, 'b')482 attr_instance = mock.Foo()483 self.assertRaises(AttributeError, getattr, attr_instance, 'b')484 # instance485 mock = create_autospec(Foo())486 self.assertRaises(AttributeError, getattr, mock, 'b')487 self.assertRaises(TypeError, mock)488 # attribute instance489 call_result = mock.Foo()490 self.assertRaises(AttributeError, getattr, call_result, 'b')491 def test_builtins(self):492 # used to fail with infinite recursion493 create_autospec(1)494 create_autospec(int)495 create_autospec('foo')496 create_autospec(str)497 create_autospec({})498 create_autospec(dict)499 create_autospec([])500 create_autospec(list)501 create_autospec(set())502 create_autospec(set)503 create_autospec(1.0)504 create_autospec(float)505 create_autospec(1j)506 create_autospec(complex)507 create_autospec(False)508 create_autospec(True)509 def test_function(self):510 def f(a, b):511 pass512 mock = create_autospec(f)513 self.assertRaises(TypeError, mock)514 mock(1, 2)515 mock.assert_called_with(1, 2)516 mock.assert_called_with(1, b=2)517 mock.assert_called_with(a=1, b=2)518 f.f = f519 mock = create_autospec(f)520 self.assertRaises(TypeError, mock.f)521 mock.f(3, 4)522 mock.f.assert_called_with(3, 4)523 mock.f.assert_called_with(a=3, b=4)524 def test_skip_attributeerrors(self):525 class Raiser(object):526 def __get__(self, obj, type=None):527 if obj is None:528 raise AttributeError('Can only be accessed via an instance')529 class RaiserClass(object):530 raiser = Raiser()531 @staticmethod532 def existing(a, b):533 return a + b534 s = create_autospec(RaiserClass)535 self.assertRaises(TypeError, lambda x: s.existing(1, 2, 3))536 s.existing(1, 2)537 self.assertRaises(AttributeError, lambda: s.nonexisting)538 # check we can fetch the raiser attribute and it has no spec539 obj = s.raiser540 obj.foo, obj.bar541 def test_signature_class(self):542 class Foo(object):543 def __init__(self, a, b=3):544 pass545 mock = create_autospec(Foo)546 self.assertRaises(TypeError, mock)547 mock(1)548 mock.assert_called_once_with(1)549 mock.assert_called_once_with(a=1)550 self.assertRaises(AssertionError, mock.assert_called_once_with, 2)551 mock(4, 5)552 mock.assert_called_with(4, 5)553 mock.assert_called_with(a=4, b=5)554 self.assertRaises(AssertionError, mock.assert_called_with, a=5, b=4)555 def test_class_with_no_init(self):556 # this used to raise an exception557 # due to trying to get a signature from object.__init__558 class Foo(object):559 pass560 create_autospec(Foo)561 def test_signature_callable(self):562 class Callable(object):563 def __init__(self, x, y):564 pass565 def __call__(self, a):566 pass567 mock = create_autospec(Callable)568 mock(1, 2)569 mock.assert_called_once_with(1, 2)570 mock.assert_called_once_with(x=1, y=2)571 self.assertRaises(TypeError, mock, 'a')572 instance = mock(1, 2)573 self.assertRaises(TypeError, instance)574 instance(a='a')575 instance.assert_called_once_with('a')576 instance.assert_called_once_with(a='a')577 instance('a')578 instance.assert_called_with('a')579 instance.assert_called_with(a='a')580 mock = create_autospec(Callable(1, 2))581 mock(a='a')582 mock.assert_called_once_with(a='a')583 self.assertRaises(TypeError, mock)584 mock('a')585 mock.assert_called_with('a')586 def test_signature_noncallable(self):587 class NonCallable(object):588 def __init__(self):589 pass590 mock = create_autospec(NonCallable)591 instance = mock()592 mock.assert_called_once_with()593 self.assertRaises(TypeError, mock, 'a')594 self.assertRaises(TypeError, instance)595 self.assertRaises(TypeError, instance, 'a')596 mock = create_autospec(NonCallable())597 self.assertRaises(TypeError, mock)598 self.assertRaises(TypeError, mock, 'a')599 def test_create_autospec_none(self):600 class Foo(object):601 bar = None602 mock = create_autospec(Foo)603 none = mock.bar604 self.assertNotIsInstance(none, type(None))605 none.foo()606 none.foo.assert_called_once_with()607 def test_autospec_functions_with_self_in_odd_place(self):608 class Foo(object):609 def f(a, self):610 pass611 a = create_autospec(Foo)612 a.f(10)613 a.f.assert_called_with(10)614 a.f.assert_called_with(self=10)615 a.f(self=10)616 a.f.assert_called_with(10)617 a.f.assert_called_with(self=10)618 def test_autospec_data_descriptor(self):619 class Descriptor(object):620 def __init__(self, value):621 self.value = value622 def __get__(self, obj, cls=None):623 if obj is None:624 return self625 return self.value626 def __set__(self, obj, value):627 pass628 class MyProperty(property):629 pass630 class Foo(object):631 __slots__ = ['slot']632 @property633 def prop(self):634 return 3635 @MyProperty636 def subprop(self):637 return 4638 desc = Descriptor(42)639 foo = create_autospec(Foo)640 def check_data_descriptor(mock_attr):641 # Data descriptors don't have a spec.642 self.assertIsInstance(mock_attr, MagicMock)643 mock_attr(1, 2, 3)644 mock_attr.abc(4, 5, 6)645 mock_attr.assert_called_once_with(1, 2, 3)646 mock_attr.abc.assert_called_once_with(4, 5, 6)647 # property648 check_data_descriptor(foo.prop)649 # property subclass650 check_data_descriptor(foo.subprop)651 # class __slot__652 check_data_descriptor(foo.slot)653 # plain data descriptor654 check_data_descriptor(foo.desc)655 def test_autospec_on_bound_builtin_function(self):656 meth = types.MethodType(time.ctime, time.time())657 self.assertIsInstance(meth(), str)658 mocked = create_autospec(meth)659 # no signature, so no spec to check against660 mocked()661 mocked.assert_called_once_with()662 mocked.reset_mock()663 mocked(4, 5, 6)664 mocked.assert_called_once_with(4, 5, 6)665 def test_autospec_getattr_partial_function(self):666 # bpo-32153 : getattr returning partial functions without667 # __name__ should not create AttributeError in create_autospec668 class Foo:669 def __getattr__(self, attribute):670 return partial(lambda name: name, attribute)671 proxy = Foo()672 autospec = create_autospec(proxy)673 self.assertFalse(hasattr(autospec, '__name__'))674 def test_spec_inspect_signature(self):675 def myfunc(x, y):676 pass677 mock = create_autospec(myfunc)678 mock(1, 2)679 mock(x=1, y=2)680 self.assertEqual(inspect.getfullargspec(mock), inspect.getfullargspec(myfunc))681 self.assertEqual(mock.mock_calls, [call(1, 2), call(x=1, y=2)])682 self.assertRaises(TypeError, mock, 1)683 def test_spec_inspect_signature_annotations(self):684 def foo(a: int, b: int=10, *, c:int) -> int:685 return a + b + c686 mock = create_autospec(foo)687 mock(1, 2, c=3)688 mock(1, c=3)689 self.assertEqual(inspect.getfullargspec(mock), inspect.getfullargspec(foo))690 self.assertEqual(mock.mock_calls, [call(1, 2, c=3), call(1, c=3)])691 self.assertRaises(TypeError, mock, 1)692 self.assertRaises(TypeError, mock, 1, 2, 3, c=4)693class TestCallList(unittest.TestCase):694 def test_args_list_contains_call_list(self):695 mock = Mock()696 self.assertIsInstance(mock.call_args_list, _CallList)697 mock(1, 2)698 mock(a=3)699 mock(3, 4)700 mock(b=6)701 for kall in call(1, 2), call(a=3), call(3, 4), call(b=6):702 self.assertIn(kall, mock.call_args_list)703 calls = [call(a=3), call(3, 4)]704 self.assertIn(calls, mock.call_args_list)705 calls = [call(1, 2), call(a=3)]706 self.assertIn(calls, mock.call_args_list)707 calls = [call(3, 4), call(b=6)]708 self.assertIn(calls, mock.call_args_list)709 calls = [call(3, 4)]710 self.assertIn(calls, mock.call_args_list)711 self.assertNotIn(call('fish'), mock.call_args_list)712 self.assertNotIn([call('fish')], mock.call_args_list)713 def test_call_list_str(self):714 mock = Mock()715 mock(1, 2)716 mock.foo(a=3)717 mock.foo.bar().baz('fish', cat='dog')718 expected = (719 "[call(1, 2),\n"720 " call.foo(a=3),\n"721 " call.foo.bar(),\n"722 " call.foo.bar().baz('fish', cat='dog')]"723 )724 self.assertEqual(str(mock.mock_calls), expected)725 def test_propertymock(self):726 p = patch('%s.SomeClass.one' % __name__, new_callable=PropertyMock)727 mock = p.start()728 try:729 SomeClass.one730 mock.assert_called_once_with()731 s = SomeClass()732 s.one733 mock.assert_called_with()734 self.assertEqual(mock.mock_calls, [call(), call()])735 s.one = 3736 self.assertEqual(mock.mock_calls, [call(), call(), call(3)])737 finally:738 p.stop()739 def test_propertymock_returnvalue(self):740 m = MagicMock()741 p = PropertyMock()742 type(m).foo = p743 returned = m.foo744 p.assert_called_once_with()745 self.assertIsInstance(returned, MagicMock)746 self.assertNotIsInstance(returned, PropertyMock)747class TestCallablePredicate(unittest.TestCase):748 def test_type(self):749 for obj in [str, bytes, int, list, tuple, SomeClass]:750 self.assertTrue(_callable(obj))...
pjsua_app.py
Source:pjsua_app.py
...60 if ci.state == py_pjsua.PJSIP_INV_STATE_DISCONNECTED:61 g_current_call = py_pjsua.PJSUA_INVALID_ID62# Callback for incoming call63#64def on_incoming_call(acc_id, call_id, rdata):65 global g_current_call66 67 if g_current_call != py_pjsua.PJSUA_INVALID_ID:68 # There's call in progress - answer Busy69 py_pjsua.call_answer(call_id, 486, None, None)70 return71 72 g_current_call = call_id73 ci = py_pjsua.call_get_info(call_id)74 write_log(3, "*** Incoming call: " + call_name(call_id) + "***")75 write_log(3, "*** Press a to answer or h to hangup ***")76 77 78 79# Callback when media state has changed (e.g. established or terminated)80#81def on_call_media_state(call_id):82 ci = py_pjsua.call_get_info(call_id)83 if ci.media_status == py_pjsua.PJSUA_CALL_MEDIA_ACTIVE:84 py_pjsua.conf_connect(ci.conf_slot, 0)85 py_pjsua.conf_connect(0, ci.conf_slot)86 write_log(3, call_name(call_id) + ": media is active")87 else:88 write_log(3, call_name(call_id) + ": media is inactive")89# Callback when account registration state has changed90#91def on_reg_state(acc_id):92 acc_info = py_pjsua.acc_get_info(acc_id)93 if acc_info.has_registration != 0:94 cmd = "registration"95 else:96 cmd = "unregistration"97 if acc_info.status != 0 and acc_info.status != 200:98 write_log(3, "Account " + cmd + " failed: rc=" + `acc_info.status` + " " + acc_info.status_text)99 else:100 write_log(3, "Account " + cmd + " success")101# Callback when buddy's presence state has changed102#103def on_buddy_state(buddy_id):104 write_log(3, "On Buddy state called")105 buddy_info = py_pjsua.buddy_get_info(buddy_id)106 if buddy_info.status != 0 and buddy_info.status != 200:107 write_log(3, "Status of " + `buddy_info.uri` + " is " + `buddy_info.status_text`)108 else:109 write_log(3, "Status : " + `buddy_info.status`)110# Callback on incoming pager (MESSAGE)111# 112def on_pager(call_id, strfrom, strto, contact, mime_type, text):113 write_log(3, "MESSAGE from " + `strfrom` + " : " + `text`)114# Callback on the delivery status of outgoing pager (MESSAGE)115# 116def on_pager_status(call_id, strto, body, user_data, status, reason):117 write_log(3, "MESSAGE to " + `strto` + " status " + `status` + " reason " + `reason`)118# Received typing indication119#120def on_typing(call_id, strfrom, to, contact, is_typing):121 str_t = ""122 if is_typing:123 str_t = "is typing.."124 else:125 str_t = "has stopped typing"126 write_log(3, "IM indication: " + strfrom + " " + str_t)127# Received the status of previous call transfer request128#129def on_call_transfer_status(call_id,status_code,status_text,final,p_cont):130 strfinal = ""131 if final == 1:132 strfinal = "[final]"133 134 write_log(3, "Call " + `call_id` + ": transfer status= " + `status_code` + " " + status_text+ " " + strfinal)135 136 if status_code/100 == 2:137 write_log(3, "Call " + `call_id` + " : call transfered successfully, disconnecting call")138 status = py_pjsua.call_hangup(call_id, 410, None, None)139 p_cont = 0140# Callback on incoming call transfer request141# 142def on_call_transfer_request(call_id, dst, code):143 write_log(3, "Call transfer request from " + `call_id` + " to " + dst + " with code " + `code`)144#145# Initialize pjsua.146#147def app_init():148 global g_acc_id, g_ua_cfg149 # Create pjsua before anything else150 status = py_pjsua.create()151 if status != 0:152 err_exit("pjsua create() error", status)153 # Create and initialize logging config154 log_cfg = py_pjsua.logging_config_default()155 log_cfg.level = C_LOG_LEVEL156 log_cfg.cb = log_cb157 # Create and initialize pjsua config158 # Note: for this Python module, thread_cnt must be 0 since Python159 # doesn't like to be called from alien thread (pjsua's thread160 # in this case) 161 ua_cfg = py_pjsua.config_default()162 ua_cfg.thread_cnt = 0163 ua_cfg.user_agent = "PJSUA/Python 0.1"164 ua_cfg.cb.on_incoming_call = on_incoming_call165 ua_cfg.cb.on_call_media_state = on_call_media_state166 ua_cfg.cb.on_reg_state = on_reg_state167 ua_cfg.cb.on_call_state = on_call_state168 ua_cfg.cb.on_buddy_state = on_buddy_state169 ua_cfg.cb.on_pager = on_pager170 ua_cfg.cb.on_pager_status = on_pager_status171 ua_cfg.cb.on_typing = on_typing172 ua_cfg.cb.on_call_transfer_status = on_call_transfer_status173 ua_cfg.cb.on_call_transfer_request = on_call_transfer_request174 # Configure STUN setting175 if C_STUN_HOST != "":176 ua_cfg.stun_host = C_STUN_HOST;177 # Create and initialize media config178 med_cfg = py_pjsua.media_config_default()179 med_cfg.ec_tail_len = 0180 #181 # Initialize pjsua!!182 #183 status = py_pjsua.init(ua_cfg, log_cfg, med_cfg)184 if status != 0:185 err_exit("pjsua init() error", status)186 # Configure UDP transport config187 transport_cfg = py_pjsua.transport_config_default()188 transport_cfg.port = C_SIP_PORT189 # Create UDP transport190 status, transport_id = \191 py_pjsua.transport_create(py_pjsua.PJSIP_TRANSPORT_UDP, transport_cfg)192 if status != 0:193 err_exit("Error creating UDP transport", status)194 195 # Create initial default account196 status, acc_id = py_pjsua.acc_add_local(transport_id, 1)197 if status != 0:198 err_exit("Error creating account", status)199 g_acc_id = acc_id200 g_ua_cfg = ua_cfg201# Add SIP account interractively202#203def add_account():204 global g_acc_id205 acc_domain = ""206 acc_username = ""207 acc_passwd =""208 confirm = ""209 210 # Input account configs211 print "Your SIP domain (e.g. myprovider.com): ",212 acc_domain = sys.stdin.readline()213 if acc_domain == "\n": 214 return215 acc_domain = acc_domain.replace("\n", "")216 print "Your username (e.g. alice): ",217 acc_username = sys.stdin.readline()218 if acc_username == "\n":219 return220 acc_username = acc_username.replace("\n", "")221 print "Your password (e.g. secret): ",222 acc_passwd = sys.stdin.readline()223 if acc_passwd == "\n":224 return225 acc_passwd = acc_passwd.replace("\n", "")226 # Configure account configuration227 acc_cfg = py_pjsua.acc_config_default()228 acc_cfg.id = "sip:" + acc_username + "@" + acc_domain229 acc_cfg.reg_uri = "sip:" + acc_domain230 cred_info = py_pjsua.Pjsip_Cred_Info()231 cred_info.realm = "*"232 cred_info.scheme = "digest"233 cred_info.username = acc_username234 cred_info.data_type = 0235 cred_info.data = acc_passwd236 acc_cfg.cred_info.append(1)237 acc_cfg.cred_info[0] = cred_info238 # Add new SIP account239 status, acc_id = py_pjsua.acc_add(acc_cfg, 1)240 if status != 0:241 py_pjsua.perror(THIS_FILE, "Error adding SIP account", status)242 else:243 g_acc_id = acc_id244 write_log(3, "Account " + acc_cfg.id + " added")245def add_player():246 global g_wav_files247 global g_wav_id248 global g_wav_port249 250 file_name = ""251 status = -1252 wav_id = 0253 254 print "Enter the path of the file player(e.g. /tmp/audio.wav): ",255 file_name = sys.stdin.readline()256 if file_name == "\n": 257 return258 file_name = file_name.replace("\n", "")259 status, wav_id = py_pjsua.player_create(file_name, 0)260 if status != 0:261 py_pjsua.perror(THIS_FILE, "Error adding file player ", status)262 else:263 g_wav_files.append(file_name)264 if g_wav_id == 0:265 g_wav_id = wav_id266 g_wav_port = py_pjsua.player_get_conf_port(wav_id)267 write_log(3, "File player " + file_name + " added")268 269def add_recorder():270 global g_rec_file271 global g_rec_id272 global g_rec_port273 274 file_name = ""275 status = -1276 rec_id = 0277 278 print "Enter the path of the file recorder(e.g. /tmp/audio.wav): ",279 file_name = sys.stdin.readline()280 if file_name == "\n": 281 return282 file_name = file_name.replace("\n", "")283 status, rec_id = py_pjsua.recorder_create(file_name, 0, None, 0, 0)284 if status != 0:285 py_pjsua.perror(THIS_FILE, "Error adding file recorder ", status)286 else:287 g_rec_file = file_name288 g_rec_id = rec_id289 g_rec_port = py_pjsua.recorder_get_conf_port(rec_id)290 write_log(3, "File recorder " + file_name + " added")291def conf_list():292 ports = None293 print "Conference ports : "294 ports = py_pjsua.enum_conf_ports()295 for port in ports:296 info = None297 info = py_pjsua.conf_get_port_info(port)298 txlist = ""299 for listener in info.listeners:300 txlist = txlist + "#" + `listener` + " "301 302 print "Port #" + `info.slot_id` + "[" + `(info.clock_rate/1000)` + "KHz/" + `(info.samples_per_frame * 1000 / info.clock_rate)` + "ms] " + info.name + " transmitting to: " + txlist303 304def connect_port():305 src_port = 0306 dst_port = 0307 308 print "Connect src port # (empty to cancel): "309 src_port = sys.stdin.readline()310 if src_port == "\n": 311 return312 src_port = src_port.replace("\n", "")313 src_port = int(src_port)314 print "To dst port # (empty to cancel): "315 dst_port = sys.stdin.readline()316 if dst_port == "\n": 317 return318 dst_port = dst_port.replace("\n", "")319 dst_port = int(dst_port)320 status = py_pjsua.conf_connect(src_port, dst_port)321 if status != 0:322 py_pjsua.perror(THIS_FILE, "Error connecting port ", status)323 else: 324 write_log(3, "Port connected from " + `src_port` + " to " + `dst_port`)325 326def disconnect_port():327 src_port = 0328 dst_port = 0329 330 print "Disconnect src port # (empty to cancel): "331 src_port = sys.stdin.readline()332 if src_port == "\n": 333 return334 src_port = src_port.replace("\n", "")335 src_port = int(src_port)336 print "From dst port # (empty to cancel): "337 dst_port = sys.stdin.readline()338 if dst_port == "\n": 339 return340 dst_port = dst_port.replace("\n", "")341 dst_port = int(dst_port)342 status = py_pjsua.conf_disconnect(src_port, dst_port)343 if status != 0:344 py_pjsua.perror(THIS_FILE, "Error disconnecting port ", status)345 else: 346 write_log(3, "Port disconnected " + `src_port` + " from " + `dst_port`)347def dump_call_quality():348 global g_current_call349 350 buf = ""351 if g_current_call != -1:352 buf = py_pjsua.call_dump(g_current_call, 1, 1024, " ")353 write_log(3, "\n" + buf)354 else:355 write_log(3, "No current call")356def xfer_call():357 global g_current_call358 359 if g_current_call == -1:360 361 write_log(3, "No current call")362 else:363 call = g_current_call 364 ci = py_pjsua.call_get_info(g_current_call)365 print "Transfering current call ["+ `g_current_call` + "] " + ci.remote_info366 print "Enter sip url : "367 url = sys.stdin.readline()368 if url == "\n": 369 return370 url = url.replace("\n", "")371 if call != g_current_call:372 print "Call has been disconnected"373 return374 msg_data = py_pjsua.msg_data_init()375 status = py_pjsua.call_xfer(g_current_call, url, msg_data);376 if status != 0:377 py_pjsua.perror(THIS_FILE, "Error transfering call ", status)378 else: 379 write_log(3, "Call transfered to " + url)380 381def xfer_call_replaces():382 if g_current_call == -1:383 write_log(3, "No current call")384 else:385 call = g_current_call386 387 ids = py_pjsua.enum_calls()388 if len(ids) <= 1:389 print "There are no other calls"390 return 391 ci = py_pjsua.call_get_info(g_current_call)392 print "Transfer call [" + `g_current_call` + "] " + ci.remote_info + " to one of the following:"393 for i in range(0, len(ids)): 394 if ids[i] == call:395 continue396 call_info = py_pjsua.call_get_info(ids[i])397 print `ids[i]` + " " + call_info.remote_info + " [" + call_info.state_text + "]" 398 print "Enter call number to be replaced : "399 buf = sys.stdin.readline()400 buf = buf.replace("\n","")401 if buf == "":402 return403 dst_call = int(buf)404 405 if call != g_current_call:406 print "Call has been disconnected"407 return 408 409 if dst_call == call:410 print "Destination call number must not be the same as the call being transfered"411 return412 413 if dst_call >= py_pjsua.PJSUA_MAX_CALLS:414 print "Invalid destination call number"415 return416 417 if py_pjsua.call_is_active(dst_call) == 0:418 print "Invalid destination call number"419 return 420 py_pjsua.call_xfer_replaces(call, dst_call, 0, None)421 422#423# Worker thread function.424# Python doesn't like it when it's called from an alien thread425# (pjsua's worker thread, in this case), so for Python we must426# disable worker thread in pjsua and poll pjsua from Python instead.427#428def worker_thread_main(arg):429 global C_QUIT430 thread_desc = 0;431 status = py_pjsua.thread_register("python worker", thread_desc)432 if status != 0:433 py_pjsua.perror(THIS_FILE, "Error registering thread", status)434 else:435 while C_QUIT == 0:436 py_pjsua.handle_events(50)437 print "Worker thread quitting.."438 C_QUIT = 2439# Start pjsua440#441def app_start():442 # Done with initialization, start pjsua!!443 #444 status = py_pjsua.start()445 if status != 0:446 err_exit("Error starting pjsua!", status)447 # Start worker thread448 thr = thread.start_new(worker_thread_main, (0,))449 450 print "PJSUA Started!!"451# Print account and buddy list452def print_acc_buddy_list():453 global g_acc_id454 455 acc_ids = py_pjsua.enum_accs()456 print "Account list:"457 for acc_id in acc_ids:458 acc_info = py_pjsua.acc_get_info(acc_id)459 if acc_info.has_registration == 0:460 acc_status = acc_info.status_text461 else:462 acc_status = `acc_info.status` + "/" + acc_info.status_text + " (expires=" + `acc_info.expires` + ")"463 if acc_id == g_acc_id:464 print " *",465 else:466 print " ",467 print "[" + `acc_id` + "] " + acc_info.acc_uri + ": " + acc_status468 print " Presence status: ",469 if acc_info.online_status != 0:470 print "Online"471 else:472 print "Invisible"473 if py_pjsua.get_buddy_count() > 0:474 print ""475 print "Buddy list:"476 buddy_ids = py_pjsua.enum_buddies()477 for buddy_id in buddy_ids:478 bi = py_pjsua.buddy_get_info(buddy_id)479 print " [" + `buddy_id` + "] " + bi.status_text + " " + bi.uri480 481 482# Print application menu483#484def print_menu():485 print ""486 print ">>>"487 print_acc_buddy_list()488 print """489+============================================================================+490| Call Commands : | Buddy, IM & Presence: | Account: |491| | | |492| m Make call | +b Add buddy | +a Add account |493| a Answer current call | -b Delete buddy | -a Delete accnt |494| h Hangup current call | | |495| H Hold call | i Send instant message | rr register |496| v re-inVite (release Hold) | s Subscribe presence | ru Unregister |497| # Send DTMF string | u Unsubscribe presence | |498| dq Dump curr. call quality | t ToGgle Online status | |499| +--------------------------+------------------+500| x Xfer call | Media Commands: | Status: |501| X Xfer with Replaces | | |502| | cl List ports | d Dump status |503| | cc Connect port | dd Dump detail |504| | cd Disconnect port | |505| | +p Add file player | |506|------------------------------+ +r Add file recorder | |507| q Quit application | | |508+============================================================================+"""509 print "You have " + `py_pjsua.call_get_count()` + " active call(s)"510 print ">>>", 511# Menu512#513def app_menu():514 global g_acc_id515 global g_current_call516 quit = 0517 while quit == 0:518 print_menu()519 choice = sys.stdin.readline()520 if choice[0] == "q":521 quit = 1522 elif choice[0] == "i":523 # Sending IM 524 print "Send IM to SIP URL: ",525 url = sys.stdin.readline()526 if url == "\n":527 continue528 # Send typing indication529 py_pjsua.im_typing(g_acc_id, url, 1, None) 530 print "The content: ",531 message = sys.stdin.readline()532 if message == "\n":533 py_pjsua.im_typing(g_acc_id, url, 0, None) 534 continue535 # Send the IM!536 py_pjsua.im_send(g_acc_id, url, None, message, None, 0)537 elif choice[0] == "m":538 # Make call 539 print "Using account ", g_acc_id540 print "Make call to SIP URL: ",541 url = sys.stdin.readline()542 url = url.replace("\n", "")543 if url == "":544 continue545 # Initiate the call!546 status, call_id = py_pjsua.call_make_call(g_acc_id, url, 0, 0, None)547 548 if status != 0:549 py_pjsua.perror(THIS_FILE, "Error making call", status)550 else:551 g_current_call = call_id552 elif choice[0] == "+" and choice[1] == "b":553 # Add new buddy554 bc = py_pjsua.Buddy_Config()555 print "Buddy URL: ",556 bc.uri = sys.stdin.readline()557 if bc.uri == "\n":558 continue559 560 bc.uri = bc.uri.replace("\n", "")561 bc.subscribe = 1562 status, buddy_id = py_pjsua.buddy_add(bc)563 if status != 0:564 py_pjsua.perror(THIS_FILE, "Error adding buddy", status)565 elif choice[0] == "-" and choice[1] == "b":566 print "Enter buddy ID to delete : "567 buf = sys.stdin.readline()568 buf = buf.replace("\n","")569 if buf == "":570 continue571 i = int(buf)572 if py_pjsua.buddy_is_valid(i) == 0:573 print "Invalid buddy id " + `i`574 else:575 py_pjsua.buddy_del(i)576 print "Buddy " + `i` + " deleted" 577 elif choice[0] == "+" and choice[1] == "a":578 # Add account579 add_account()580 elif choice[0] == "-" and choice[1] == "a":581 print "Enter account ID to delete : "582 buf = sys.stdin.readline()583 buf = buf.replace("\n","")584 if buf == "":585 continue586 i = int(buf)587 if py_pjsua.acc_is_valid(i) == 0:588 print "Invalid account id " + `i`589 else:590 py_pjsua.acc_del(i)591 print "Account " + `i` + " deleted"592 593 elif choice[0] == "+" and choice[1] == "p":594 add_player()595 elif choice[0] == "+" and choice[1] == "r":596 add_recorder()597 elif choice[0] == "c" and choice[1] == "l":598 conf_list()599 elif choice[0] == "c" and choice[1] == "c":600 connect_port()601 elif choice[0] == "c" and choice[1] == "d":602 disconnect_port()603 elif choice[0] == "d" and choice[1] == "q":604 dump_call_quality()605 elif choice[0] == "x":606 xfer_call()607 elif choice[0] == "X":608 xfer_call_replaces()609 elif choice[0] == "h":610 if g_current_call != py_pjsua.PJSUA_INVALID_ID:611 py_pjsua.call_hangup(g_current_call, 603, None, None)612 else:613 print "No current call"614 elif choice[0] == "H":615 if g_current_call != py_pjsua.PJSUA_INVALID_ID:616 py_pjsua.call_set_hold(g_current_call, None)617 618 else:619 print "No current call"620 elif choice[0] == "v":...
test_class.py
Source:test_class.py
1"Test the functionality of Python classes implementing operators."2import unittest3from test import test_support4testmeths = [5# Binary operations6 "add",7 "radd",8 "sub",9 "rsub",10 "mul",11 "rmul",12 "div",13 "rdiv",14 "mod",15 "rmod",16 "divmod",17 "rdivmod",18 "pow",19 "rpow",20 "rshift",21 "rrshift",22 "lshift",23 "rlshift",24 "and",25 "rand",26 "or",27 "ror",28 "xor",29 "rxor",30# List/dict operations31 "contains",32 "getitem",33 "getslice",34 "setitem",35 "setslice",36 "delitem",37 "delslice",38# Unary operations39 "neg",40 "pos",41 "abs",42# generic operations43 "init",44 ]45# These need to return something other than None46# "coerce",47# "hash",48# "str",49# "repr",50# "int",51# "long",52# "float",53# "oct",54# "hex",55# These are separate because they can influence the test of other methods.56# "getattr",57# "setattr",58# "delattr",59callLst = []60def trackCall(f):61 def track(*args, **kwargs):62 callLst.append((f.__name__, args))63 return f(*args, **kwargs)64 return track65class AllTests:66 trackCall = trackCall67 @trackCall68 def __coerce__(self, *args):69 return (self,) + args70 @trackCall71 def __hash__(self, *args):72 return hash(id(self))73 @trackCall74 def __str__(self, *args):75 return "AllTests"76 @trackCall77 def __repr__(self, *args):78 return "AllTests"79 @trackCall80 def __int__(self, *args):81 return 182 @trackCall83 def __float__(self, *args):84 return 1.085 @trackCall86 def __long__(self, *args):87 return 1L88 @trackCall89 def __oct__(self, *args):90 return '01'91 @trackCall92 def __hex__(self, *args):93 return '0x1'94 @trackCall95 def __cmp__(self, *args):96 return 097# Synthesize all the other AllTests methods from the names in testmeths.98method_template = """\99@trackCall100def __%(method)s__(self, *args):101 pass102"""103for method in testmeths:104 exec method_template % locals() in AllTests.__dict__105del method, method_template106class ClassTests(unittest.TestCase):107 def setUp(self):108 callLst[:] = []109 def assertCallStack(self, expected_calls):110 actualCallList = callLst[:] # need to copy because the comparison below will add111 # additional calls to callLst112 if expected_calls != actualCallList:113 self.fail("Expected call list:\n %s\ndoes not match actual call list\n %s" %114 (expected_calls, actualCallList))115 def testInit(self):116 foo = AllTests()117 self.assertCallStack([("__init__", (foo,))])118 def testBinaryOps(self):119 testme = AllTests()120 # Binary operations121 callLst[:] = []122 testme + 1123 self.assertCallStack([("__coerce__", (testme, 1)), ("__add__", (testme, 1))])124 callLst[:] = []125 1 + testme126 self.assertCallStack([("__coerce__", (testme, 1)), ("__radd__", (testme, 1))])127 callLst[:] = []128 testme - 1129 self.assertCallStack([("__coerce__", (testme, 1)), ("__sub__", (testme, 1))])130 callLst[:] = []131 1 - testme132 self.assertCallStack([("__coerce__", (testme, 1)), ("__rsub__", (testme, 1))])133 callLst[:] = []134 testme * 1135 self.assertCallStack([("__coerce__", (testme, 1)), ("__mul__", (testme, 1))])136 callLst[:] = []137 1 * testme138 self.assertCallStack([("__coerce__", (testme, 1)), ("__rmul__", (testme, 1))])139 if 1/2 == 0:140 callLst[:] = []141 testme / 1142 self.assertCallStack([("__coerce__", (testme, 1)), ("__div__", (testme, 1))])143 callLst[:] = []144 1 / testme145 self.assertCallStack([("__coerce__", (testme, 1)), ("__rdiv__", (testme, 1))])146 callLst[:] = []147 testme % 1148 self.assertCallStack([("__coerce__", (testme, 1)), ("__mod__", (testme, 1))])149 callLst[:] = []150 1 % testme151 self.assertCallStack([("__coerce__", (testme, 1)), ("__rmod__", (testme, 1))])152 callLst[:] = []153 divmod(testme,1)154 self.assertCallStack([("__coerce__", (testme, 1)), ("__divmod__", (testme, 1))])155 callLst[:] = []156 divmod(1, testme)157 self.assertCallStack([("__coerce__", (testme, 1)), ("__rdivmod__", (testme, 1))])158 callLst[:] = []159 testme ** 1160 self.assertCallStack([("__coerce__", (testme, 1)), ("__pow__", (testme, 1))])161 callLst[:] = []162 1 ** testme163 self.assertCallStack([("__coerce__", (testme, 1)), ("__rpow__", (testme, 1))])164 callLst[:] = []165 testme >> 1166 self.assertCallStack([("__coerce__", (testme, 1)), ("__rshift__", (testme, 1))])167 callLst[:] = []168 1 >> testme169 self.assertCallStack([("__coerce__", (testme, 1)), ("__rrshift__", (testme, 1))])170 callLst[:] = []171 testme << 1172 self.assertCallStack([("__coerce__", (testme, 1)), ("__lshift__", (testme, 1))])173 callLst[:] = []174 1 << testme175 self.assertCallStack([("__coerce__", (testme, 1)), ("__rlshift__", (testme, 1))])176 callLst[:] = []177 testme & 1178 self.assertCallStack([("__coerce__", (testme, 1)), ("__and__", (testme, 1))])179 callLst[:] = []180 1 & testme181 self.assertCallStack([("__coerce__", (testme, 1)), ("__rand__", (testme, 1))])182 callLst[:] = []183 testme | 1184 self.assertCallStack([("__coerce__", (testme, 1)), ("__or__", (testme, 1))])185 callLst[:] = []186 1 | testme187 self.assertCallStack([("__coerce__", (testme, 1)), ("__ror__", (testme, 1))])188 callLst[:] = []189 testme ^ 1190 self.assertCallStack([("__coerce__", (testme, 1)), ("__xor__", (testme, 1))])191 callLst[:] = []192 1 ^ testme193 self.assertCallStack([("__coerce__", (testme, 1)), ("__rxor__", (testme, 1))])194 def testListAndDictOps(self):195 testme = AllTests()196 # List/dict operations197 class Empty: pass198 try:199 1 in Empty()200 self.fail('failed, should have raised TypeError')201 except TypeError:202 pass203 callLst[:] = []204 1 in testme205 self.assertCallStack([('__contains__', (testme, 1))])206 callLst[:] = []207 testme[1]208 self.assertCallStack([('__getitem__', (testme, 1))])209 callLst[:] = []210 testme[1] = 1211 self.assertCallStack([('__setitem__', (testme, 1, 1))])212 callLst[:] = []213 del testme[1]214 self.assertCallStack([('__delitem__', (testme, 1))])215 callLst[:] = []216 testme[:42]217 self.assertCallStack([('__getslice__', (testme, 0, 42))])218 callLst[:] = []219 testme[:42] = "The Answer"220 self.assertCallStack([('__setslice__', (testme, 0, 42, "The Answer"))])221 callLst[:] = []222 del testme[:42]223 self.assertCallStack([('__delslice__', (testme, 0, 42))])224 callLst[:] = []225 testme[2:1024:10]226 self.assertCallStack([('__getitem__', (testme, slice(2, 1024, 10)))])227 callLst[:] = []228 testme[2:1024:10] = "A lot"229 self.assertCallStack([('__setitem__', (testme, slice(2, 1024, 10),230 "A lot"))])231 callLst[:] = []232 del testme[2:1024:10]233 self.assertCallStack([('__delitem__', (testme, slice(2, 1024, 10)))])234 callLst[:] = []235 testme[:42, ..., :24:, 24, 100]236 self.assertCallStack([('__getitem__', (testme, (slice(None, 42, None),237 Ellipsis,238 slice(None, 24, None),239 24, 100)))])240 callLst[:] = []241 testme[:42, ..., :24:, 24, 100] = "Strange"242 self.assertCallStack([('__setitem__', (testme, (slice(None, 42, None),243 Ellipsis,244 slice(None, 24, None),245 24, 100), "Strange"))])246 callLst[:] = []247 del testme[:42, ..., :24:, 24, 100]248 self.assertCallStack([('__delitem__', (testme, (slice(None, 42, None),249 Ellipsis,250 slice(None, 24, None),251 24, 100)))])252 # Now remove the slice hooks to see if converting normal slices to253 # slice object works.254 getslice = AllTests.__getslice__255 del AllTests.__getslice__256 setslice = AllTests.__setslice__257 del AllTests.__setslice__258 delslice = AllTests.__delslice__259 del AllTests.__delslice__260 # XXX when using new-style classes the slice testme[:42] produces261 # slice(None, 42, None) instead of slice(0, 42, None). py3k will have262 # to change this test.263 callLst[:] = []264 testme[:42]265 self.assertCallStack([('__getitem__', (testme, slice(0, 42, None)))])266 callLst[:] = []267 testme[:42] = "The Answer"268 self.assertCallStack([('__setitem__', (testme, slice(0, 42, None),269 "The Answer"))])270 callLst[:] = []271 del testme[:42]272 self.assertCallStack([('__delitem__', (testme, slice(0, 42, None)))])273 # Restore the slice methods, or the tests will fail with regrtest -R.274 AllTests.__getslice__ = getslice275 AllTests.__setslice__ = setslice276 AllTests.__delslice__ = delslice277 @test_support.cpython_only278 def testDelItem(self):279 class A:280 ok = False281 def __delitem__(self, key):282 self.ok = True283 a = A()284 # Subtle: we need to call PySequence_SetItem, not PyMapping_SetItem.285 from _testcapi import sequence_delitem286 sequence_delitem(a, 2)287 self.assertTrue(a.ok)288 def testUnaryOps(self):289 testme = AllTests()290 callLst[:] = []291 -testme292 self.assertCallStack([('__neg__', (testme,))])293 callLst[:] = []294 +testme295 self.assertCallStack([('__pos__', (testme,))])296 callLst[:] = []297 abs(testme)298 self.assertCallStack([('__abs__', (testme,))])299 callLst[:] = []300 int(testme)301 self.assertCallStack([('__int__', (testme,))])302 callLst[:] = []303 long(testme)304 self.assertCallStack([('__long__', (testme,))])305 callLst[:] = []306 float(testme)307 self.assertCallStack([('__float__', (testme,))])308 callLst[:] = []309 oct(testme)310 self.assertCallStack([('__oct__', (testme,))])311 callLst[:] = []312 hex(testme)313 self.assertCallStack([('__hex__', (testme,))])314 def testMisc(self):315 testme = AllTests()316 callLst[:] = []317 hash(testme)318 self.assertCallStack([('__hash__', (testme,))])319 callLst[:] = []320 repr(testme)321 self.assertCallStack([('__repr__', (testme,))])322 callLst[:] = []323 str(testme)324 self.assertCallStack([('__str__', (testme,))])325 callLst[:] = []326 testme == 1327 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (testme, 1))])328 callLst[:] = []329 testme < 1330 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (testme, 1))])331 callLst[:] = []332 testme > 1333 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (testme, 1))])334 callLst[:] = []335 eval('testme <> 1') # XXX kill this in py3k336 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (testme, 1))])337 callLst[:] = []338 testme != 1339 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (testme, 1))])340 callLst[:] = []341 1 == testme342 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (1, testme))])343 callLst[:] = []344 1 < testme345 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (1, testme))])346 callLst[:] = []347 1 > testme348 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (1, testme))])349 callLst[:] = []350 eval('1 <> testme')351 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (1, testme))])352 callLst[:] = []353 1 != testme354 self.assertCallStack([("__coerce__", (testme, 1)), ('__cmp__', (1, testme))])355 def testGetSetAndDel(self):356 # Interfering tests357 class ExtraTests(AllTests):358 @trackCall359 def __getattr__(self, *args):360 return "SomeVal"361 @trackCall362 def __setattr__(self, *args):363 pass364 @trackCall365 def __delattr__(self, *args):366 pass367 testme = ExtraTests()368 callLst[:] = []369 testme.spam370 self.assertCallStack([('__getattr__', (testme, "spam"))])371 callLst[:] = []372 testme.eggs = "spam, spam, spam and ham"373 self.assertCallStack([('__setattr__', (testme, "eggs",374 "spam, spam, spam and ham"))])375 callLst[:] = []376 del testme.cardinal377 self.assertCallStack([('__delattr__', (testme, "cardinal"))])378 def testDel(self):379 x = []380 class DelTest:381 def __del__(self):382 x.append("crab people, crab people")383 testme = DelTest()384 del testme385 import gc386 gc.collect()387 self.assertEqual(["crab people, crab people"], x)388 def testBadTypeReturned(self):389 # return values of some method are type-checked390 class BadTypeClass:391 def __int__(self):392 return None393 __float__ = __int__394 __long__ = __int__395 __str__ = __int__396 __repr__ = __int__397 __oct__ = __int__398 __hex__ = __int__399 for f in [int, float, long, str, repr, oct, hex]:400 self.assertRaises(TypeError, f, BadTypeClass())401 def testMixIntsAndLongs(self):402 # mixing up ints and longs is okay403 class IntLongMixClass:404 @trackCall405 def __int__(self):406 return 42L407 @trackCall408 def __long__(self):409 return 64410 mixIntAndLong = IntLongMixClass()411 callLst[:] = []412 as_int = int(mixIntAndLong)413 self.assertEqual(type(as_int), long)414 self.assertEqual(as_int, 42L)415 self.assertCallStack([('__int__', (mixIntAndLong,))])416 callLst[:] = []417 as_long = long(mixIntAndLong)418 self.assertEqual(type(as_long), long)419 self.assertEqual(as_long, 64)420 self.assertCallStack([('__long__', (mixIntAndLong,))])421 def testHashStuff(self):422 # Test correct errors from hash() on objects with comparisons but423 # no __hash__424 class C0:425 pass426 hash(C0()) # This should work; the next two should raise TypeError427 class C1:428 def __cmp__(self, other): return 0429 self.assertRaises(TypeError, hash, C1())430 class C2:431 def __eq__(self, other): return 1432 self.assertRaises(TypeError, hash, C2())433 def testSFBug532646(self):434 # Test for SF bug 532646435 class A:436 pass437 A.__call__ = A()438 a = A()439 try:440 a() # This should not segfault441 except RuntimeError:442 pass443 else:444 self.fail("Failed to raise RuntimeError")445 def testForExceptionsRaisedInInstanceGetattr2(self):446 # Tests for exceptions raised in instance_getattr2().447 def booh(self):448 raise AttributeError("booh")449 class A:450 a = property(booh)451 try:452 A().a # Raised AttributeError: A instance has no attribute 'a'453 except AttributeError, x:454 if str(x) != "booh":455 self.fail("attribute error for A().a got masked: %s" % x)456 class E:457 __eq__ = property(booh)458 E() == E() # In debug mode, caused a C-level assert() to fail459 class I:460 __init__ = property(booh)461 try:462 # In debug mode, printed XXX undetected error and463 # raises AttributeError464 I()465 except AttributeError, x:466 pass467 else:468 self.fail("attribute error for I.__init__ got masked")469 def testHashComparisonOfMethods(self):470 # Test comparison and hash of methods471 class A:472 def __init__(self, x):473 self.x = x474 def f(self):475 pass476 def g(self):477 pass478 def __eq__(self, other):479 return self.x == other.x480 def __hash__(self):481 return self.x482 class B(A):483 pass484 a1 = A(1)485 a2 = A(2)486 self.assertEqual(a1.f, a1.f)487 self.assertNotEqual(a1.f, a2.f)488 self.assertNotEqual(a1.f, a1.g)489 self.assertEqual(a1.f, A(1).f)490 self.assertEqual(hash(a1.f), hash(a1.f))491 self.assertEqual(hash(a1.f), hash(A(1).f))492 self.assertNotEqual(A.f, a1.f)493 self.assertNotEqual(A.f, A.g)494 self.assertEqual(B.f, A.f)495 self.assertEqual(hash(B.f), hash(A.f))496 # the following triggers a SystemError in 2.4497 a = A(hash(A.f.im_func)^(-1))498 hash(a.f)499 def testAttrSlots(self):500 class C:501 pass502 for c in C, C():503 self.assertRaises(TypeError, type(c).__getattribute__, c, [])504 self.assertRaises(TypeError, type(c).__setattr__, c, [], [])505def test_main():506 with test_support.check_py3k_warnings(507 (".+__(get|set|del)slice__ has been removed", DeprecationWarning),508 ("classic int division", DeprecationWarning),509 ("<> not supported", DeprecationWarning)):510 test_support.run_unittest(ClassTests)511if __name__=='__main__':...
car_telecom_utils.py
Source:car_telecom_utils.py
...114 ad.droid.telecomCallStopListeningForEvent(call_id,115 tel_defines.EventTelecomCallStateChanged)116 return False117 return False118def hangup_call(log, ad, call_id):119 """Hangup a number120 Args:121 log: log object122 ad: android device object123 call_id: Call to hangup.124 Returns:125 True if success, False if fail.126 """127 log.info("Hanging up droid {} call {}".format(128 ad.serial, call_id))129 # First check that we are in call, otherwise fail.130 if not ad.droid.telecomIsInCall():131 log.info("We are not in-call {}".format(ad.serial))132 return False133 # Make sure we are registered with the events.134 ad.droid.telecomStartListeningForCallRemoved()135 # Disconnect call.136 ad.droid.telecomCallDisconnect(call_id)137 # Wait for removed event.138 event = None139 try:140 event = ad.ed.pop_event(141 tel_defines.EventTelecomCallRemoved,142 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)143 except queue.Empty:144 log.info("Did not get TelecomCallRemoved event")145 return False146 finally:147 ad.droid.telecomStopListeningForCallRemoved()148 log.info("Removed call {}".format(event))149 if event['data']['CallId'] != call_id:150 return False151 return True152def hangup_conf(log, ad, conf_id):153 """Hangup a conference call154 Args:155 log: log object156 ad: android device object157 conf_id: Conf call to hangup.158 Returns:159 True if success, False if fail.160 """161 log.info("hangup_conf: Hanging up droid {} call {}".format(162 ad.serial, conf_id))163 # First check that we are in call, otherwise fail.164 if not ad.droid.telecomIsInCall():165 log.info("We are not in-call {}".format(ad.serial))166 return False167 # Get the list of children for this conference.168 all_calls = get_call_id_children(log, ad, conf_id)169 # All calls that needs disconnecting (Parent + Children)170 all_calls.add(conf_id)171 # Make sure we are registered with the events.172 ad.droid.telecomStartListeningForCallRemoved()173 # Disconnect call.174 ad.droid.telecomCallDisconnect(conf_id)175 # Wait for removed event.176 while len(all_calls) > 0:177 event = None178 try:179 event = ad.ed.pop_event(180 tel_defines.EventTelecomCallRemoved,181 tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)182 except queue.Empty:183 log.info("Did not get TelecomCallRemoved event")184 ad.droid.telecomStopListeningForCallRemoved()185 return False186 removed_call_id = event['data']['CallId']187 all_calls.remove(removed_call_id)188 log.info("Removed call {} left calls {}".format(removed_call_id, all_calls))189 ad.droid.telecomStopListeningForCallRemoved()190 return True191def accept_call(log, ad, call_id):192 """Accept a number193 Args:194 log: log object195 ad: android device object196 call_id: Call to accept.197 Returns:198 True if success, False if fail.199 """200 log.info("Accepting call at droid {} call {}".format(201 ad.serial, call_id))202 # First check we are in call, otherwise fail.203 if not ad.droid.telecomIsInCall():204 log.info("We are not in-call {}".format(ad.serial))205 return False206 # Accept the call and wait for the call to be accepted on AG.207 ad.droid.telecomCallAnswer(call_id, tel_defines.VT_STATE_AUDIO_ONLY)208 if not wait_for_call_state(log, ad, call_id, tel_defines.CALL_STATE_ACTIVE):209 log.error("Call {} on droid {} not active".format(210 call_id, ad.serial))211 return False212 return True213def wait_for_not_in_call(log, ad):214 """Wait for the droid to be OUT OF CALLING state.215 Args:216 log: log object217 ad: android device object218 Returns:219 True if success, False if fail.220 """221 ad.droid.telecomStartListeningForCallRemoved()222 calls = ad.droid.telecomCallGetCallIds()223 if len(calls) > 1:224 log.info("More than one call {} {}".format(calls, ad.serial))225 return False226 if len(calls) > 0:227 log.info("Got calls {} for {}".format(...
test_call.py
Source:test_call.py
2from mako import util3from util import result_lines, flatten_result4from test import TemplateTest, eq_5class CallTest(TemplateTest):6 def test_call(self):7 t = Template("""8 <%def name="foo()">9 hi im foo ${caller.body(y=5)}10 </%def>11 12 <%call expr="foo()" args="y, **kwargs">13 this is the body, y is ${y}14 </%call>15""")16 assert result_lines(t.render()) == ['hi im foo', 'this is the body, y is 5']17 def test_compound_call(self):18 t = Template("""19 <%def name="bar()">20 this is bar21 </%def>22 23 <%def name="comp1()">24 this comp1 should not be called25 </%def>26 27 <%def name="foo()">28 foo calling comp1: ${caller.comp1(x=5)}29 foo calling body: ${caller.body()}30 </%def>31 32 <%call expr="foo()">33 <%def name="comp1(x)">34 this is comp1, ${x}35 </%def>36 this is the body, ${comp1(6)}37 </%call>38 ${bar()}39""")40 assert result_lines(t.render()) == ['foo calling comp1:', 'this is comp1, 5', 'foo calling body:', 'this is the body,', 'this is comp1, 6', 'this is bar']41 def test_new_syntax(self):42 """test foo:bar syntax, including multiline args and expression eval."""43 44 # note the trailing whitespace in the bottom ${} expr, need to strip45 # that off < python 2.746 47 t = Template("""48 <%def name="foo(x, y, q, z)">49 ${x}50 ${y}51 ${q}52 ${",".join("%s->%s" % (a, b) for a, b in z)}53 </%def>54 55 <%self:foo x="this is x" y="${'some ' + 'y'}" q="56 this57 is58 q"59 60 z="${[61 (1, 2),62 (3, 4),63 (5, 6)64 ]65 66 }"/>67 """)68 69 eq_(70 result_lines(t.render()),71 ['this is x', 'some y', 'this', 'is', 'q', '1->2,3->4,5->6']72 )73 74 def test_ccall_caller(self):75 t = Template("""76 <%def name="outer_func()">77 OUTER BEGIN78 <%call expr="caller.inner_func()">79 INNER CALL80 </%call>81 OUTER END82 </%def>83 <%call expr="outer_func()">84 <%def name="inner_func()">85 INNER BEGIN86 ${caller.body()}87 INNER END88 </%def>89 </%call>90 """)91 #print t.code92 assert result_lines(t.render()) == [93 "OUTER BEGIN",94 "INNER BEGIN",95 "INNER CALL",96 "INNER END",97 "OUTER END",98 ]99 100 def test_stack_pop(self):101 t = Template("""102 <%def name="links()" buffered="True">103 Some links104 </%def>105 <%def name="wrapper(links)">106 <h1>${caller.body()}</h1>107 ${links}108 </%def>109 ## links() pushes a stack frame on. when complete,110 ## 'nextcaller' must be restored111 <%call expr="wrapper(links())">112 Some title113 </%call>114 """)115 assert result_lines(t.render()) == [116 "<h1>",117 "Some title",118 "</h1>",119 "Some links"120 ]121 122 def test_conditional_call(self):123 """test that 'caller' is non-None only if the immediate <%def> was called via <%call>"""124 t = Template("""125 <%def name="a()">126 % if caller:127 ${ caller.body() } \\128 % endif129 AAA130 ${ b() }131 </%def>132 <%def name="b()">133 % if caller:134 ${ caller.body() } \\135 % endif136 BBB137 ${ c() }138 </%def>139 <%def name="c()">140 % if caller:141 ${ caller.body() } \\142 % endif143 CCC144 </%def>145 <%call expr="a()">146 CALL147 </%call>148 """)149 assert result_lines(t.render()) == [150 "CALL",151 "AAA",152 "BBB",153 "CCC"154 ]155 156 def test_chained_call(self):157 """test %calls that are chained through their targets"""158 t = Template("""159 <%def name="a()">160 this is a. 161 <%call expr="b()">162 this is a's ccall. heres my body: ${caller.body()}163 </%call>164 </%def>165 <%def name="b()">166 this is b. heres my body: ${caller.body()}167 whats in the body's caller's body ?168 ${context.caller_stack[-2].body()}169 </%def>170 171 <%call expr="a()">172 heres the main templ call173 </%call>174 175""")176 assert result_lines(t.render()) == [177 'this is a.',178 'this is b. heres my body:',179 "this is a's ccall. heres my body:",180 'heres the main templ call',181 "whats in the body's caller's body ?",182 'heres the main templ call'183 ]184 def test_nested_call(self):185 """test %calls that are nested inside each other"""186 t = Template("""187 <%def name="foo()">188 ${caller.body(x=10)}189 </%def>190 x is ${x}191 <%def name="bar()">192 bar: ${caller.body()}193 </%def>194 <%call expr="foo()" args="x">195 this is foo body: ${x}196 <%call expr="bar()">197 this is bar body: ${x}198 </%call>...
views.py
Source:views.py
1#!/usr/bin/python32# -*- coding: utf-8 -*-3from django.views import generic4from .models import TaxiCall,TaxiCallHistory5from django.utils import timezone6from django.http import HttpResponseRedirect,HttpResponse,JsonResponse7from django.core.urlresolvers import reverse8from .BotManager import BotManager9import telebot10import json11token = "309803225:AAEfkOtjUfLCTSHicJD05uy7AvilTCkzOYs"12bot = telebot.TeleBot(token)13botManager = BotManager(bot)14class CallListView(generic.ListView): #FIRST SECTION, CALLS JUST COME AND HASN'T ACCEPTED BY MANAGER YED15 template_name = "new_call_list.html"16 context_object_name = "call_list"17 def get_queryset(self):18 call_list = TaxiCall.objects.filter(status="new").order_by("-call_time")19 return call_list20class calls_with_car_ListView(generic.ListView): #SECOND SECTION, CALLS ACCEPTED AND HAVE BEEN ATTACHED CAR, NOW MANAGER WAIT FOR USER TO AGREE WITH OFFERED CAR21 template_name = "waiting_call.html"22 context_object_name = "call_list"23 def get_queryset(self):24 call_list = TaxiCall.objects.filter(status="waiting").order_by("-call_time")25 return call_list26class NeedCarList(generic.ListView): #SECOND SECTION, CALLS ACCEPTED AND HAVE BEEN ATTACHED CAR, NOW MANAGER WAIT FOR USER TO AGREE WITH OFFERED CAR27 template_name = "need_car.html"28 context_object_name = "call_list"29 def get_queryset(self):30 call_list = TaxiCall.objects.filter(status="need_car").order_by("-call_time")31 return call_list32class acceptedCalls(generic.ListView): # USERS ALREADY AGREE WITH CAR AND CAR ON THE WAY TO USER33 template_name = "accepted.html"34 context_object_name = "call_list"35 def get_queryset(self):36 call_list = TaxiCall.objects.filter(status__in=["accepted","accepted_cancel","arrived"]).order_by("-call_time")37 return call_list38def setDriver(request): #ATACHING CAR TO USERS CALL39 CallCenterNumber = "200 10 10"40 callId,carType,carNumber,carTime = [None for i in range(4)]41 try:42 callId = request.POST["calls[]"]43 carType = request.POST["car[]"]44 carNumber = request.POST["car_number"]45 carTime = request.POST["time_for_coming"]46 except :47 print("there are no selected calls")48 if(callId):49 call_list = TaxiCall.objects.filter(call_id=callId)50 if(call_list):51 current_call = call_list[0]52 current_call.status = "waiting"53 current_call.car_set.create(car_type=carType,car_number=carNumber,car_time=carTime,driver_number=CallCenterNumber)54 current_call.save()55 if(carTime!="0"):56 botManager.SendOffer(chat_id=current_call.chat_id,car_type=carType,car_number=carNumber,arrival_time=carTime)57 else:58 botManager.SendOffer(chat_id=current_call.chat_id,car_type=carType,car_number=carNumber)59 return HttpResponseRedirect(reverse("taxibot:callList"))60def SendDriver(request): #driver is sended to user61 callId = None62 try:63 callId = request.POST["calls[]"]64 except :65 print("there are no selected calls")66 if(callId):67 current_call = TaxiCall.objects.get(call_id=callId)68 current_call.status = "accepted"69 current_call.save()70 botManager.SendDriver(current_call.chat_id)71 return HttpResponseRedirect(reverse("taxibot:NeedCarList"))72# оÑпÑавлÑÐµÑ ÐºÐ»Ð¸ÐµÐ½ÑÑ Ð´Ð°Ð½Ð½Ñе о пÑибÑÑие маÑÐ¸Ð½Ñ Ð¸ ÑбÑаÑÑÐ²Ð°ÐµÑ Ð´Ð°Ð½Ð½Ñе о маÑине73def EndCall(request):74 callId =None75 try:76 callId = request.POST["calls[]"]77 except:78 print("there are no selected calls")79 if (callId):80 current_call = TaxiCall.objects.get(call_id=callId)81 current_call.status = "arrived"82 chat_id = current_call.chat_id83 botManager.SendArrived(chat_id=chat_id)84 current_call.chat_id = 085 current_call.save()86 return HttpResponseRedirect(reverse("taxibot:accepted"))87class DBCalls(generic.ListView): # History of cars88 template_name = "DBList.html"89 context_object_name = "call_list"90 def get_queryset(self):91 call_list = TaxiCallHistory.objects.all().order_by("-call_date")92 return call_list93def clearDB(request): # delete all calls94 try:95 for call in TaxiCall.objects.filter(status__in=["accepted_cancel","arrived"]).order_by("-call_time"):96 new = TaxiCallHistory.objects.create(call_id=call.call_id,status = call.status, type=call.type, number=call.number,call_time = call.call_time,97 details=call.details,address=call.address,IsMap=call.IsMap,longitude=call.longitude,latitude=call.latitude)98 except:99 print("there are no selected calls")100 TaxiCall.objects.all().delete()101 return HttpResponseRedirect(reverse("taxibot:callList"))102def clearDBHistory(request): # delete all calls103 TaxiCallHistory.objects.all().delete()104 return HttpResponseRedirect(reverse("taxibot:db_list"))105####################################################################################################################################################################################106####################################################################################################################################################################################107####################################################################################################################################################################################108# TELEBOT109def webhook(request):110 bot.remove_webhook()111 bot.set_webhook(url="https://taxifavorituz.herokuapp.com/taxibot/309803225:AAEfkOtjUfLCTSHicJD05uy7AvilTCkzOYs/")112 return HttpResponseRedirect(reverse("taxibot:callList"))113def getUpdate(request):114 UpdateObj = json.loads(request.body.decode("utf-8"))115 UpdateManager(UpdateObj)116 return JsonResponse({'ok': True})117def UpdateManager(Update):118 keysOfUpdate = Update.keys()119 if("message" in keysOfUpdate):120 message = Update["message"]121 keys = message.keys()122 if("text" in keys):123 botManager.textManager(message=message)124 elif("location" in keys):125 botManager.get_location(message)126 elif("contact" in keys):127 botManager.GetContact(message)128 elif("callback_query" in keysOfUpdate):...
mock_calls.py
Source:mock_calls.py
...18 else:19 return pair20 def do_check(call):21 def side_effect(*args, **kwargs):22 received_call = call(*args, **kwargs)23 self._test_case.assertTrue(24 self._expected_calls,25 msg=('Unexpected call: %s' % str(received_call)))26 expected_call, action = self._expected_calls.pop(0)27 self._test_case.assertTrue(28 received_call == expected_call,29 msg=('Expected call mismatch:\n'30 ' expected: %s\n'31 ' received: %s\n' % (str(expected_call),32 str(received_call))))33 if callable(action):34 return action(*args, **kwargs)35 else:36 return action37 return side_effect38 self._test_case = test_case39 self._expected_calls = [call_action(pair) for pair in expected_calls]40 watched = watched.copy() # do not pollute the caller's dict41 watched.update(42 (call.parent.name, call.parent) for call, _ in self._expected_calls)43 self._patched = [44 test_case.patch_call(call, side_effect=do_check(call))45 for call in watched.values()46 ]47 def __enter__(self):48 for patch in self._patched:49 patch.__enter__()50 return self51 def __exit__(self, exc_type, exc_val, exc_tb):52 for patch in self._patched:53 patch.__exit__(exc_type, exc_val, exc_tb)54 if exc_type is None:55 missing = ''.join(56 ' expected: %s\n' % str(call) for call, _ in self._expected_calls)57 self._test_case.assertFalse(58 missing, msg='Expected calls not found:\n' + missing)59 def __init__(self, *args, **kwargs):60 super(TestCase, self).__init__(*args, **kwargs)61 self.call = mock.call.self62 self._watched = {}63 def call_target(self, call):64 """Resolve a self.call instance to the target it represents.65 Args:66 call: a self.call instance, e.g. self.call.adb.Shell67 Returns:68 The target object represented by the call, e.g. self.adb.Shell69 Raises:70 ValueError if the path of the call does not start with "self", i.e. the71 target of the call is external to the self object.72 AttributeError if the path of the call does not specify a valid73 chain of attributes (without any calls) starting from "self".74 """75 path = call.name.split('.')76 if path.pop(0) != 'self':77 raise ValueError("Target %r outside of 'self' object" % call.name)78 target = self79 for attr in path:80 target = getattr(target, attr)81 return target82 def patch_call(self, call, **kwargs):83 """Patch the target of a mock.call instance.84 Args:85 call: a mock.call instance identifying a target to patch86 Extra keyword arguments are processed by mock.patch87 Returns:88 A context manager to mock/unmock the target of the call89 """90 if call.name.startswith('self.'):91 target = self.call_target(call.parent)92 _, attribute = call.name.rsplit('.', 1)93 if (hasattr(type(target), attribute)94 and isinstance(getattr(type(target), attribute), property)):95 return mock.patch.object(96 type(target), attribute, new_callable=mock.PropertyMock, **kwargs)...
call.py
Source:call.py
...29class MyAccountCallback(pj.AccountCallback):30 def __init__(self, account=None):31 pj.AccountCallback.__init__(self, account)32 # Notification on incoming call33 def on_incoming_call(self, call):34 global current_call 35 if current_call:36 call.answer(486, "Busy")37 return38 39 print "Incoming call from ", call.info().remote_uri40 print "Press 'a' to answer"41 current_call = call42 call_cb = MyCallCallback(current_call)43 current_call.set_callback(call_cb)44 current_call.answer(180)45 46# Callback to receive events from Call47class MyCallCallback(pj.CallCallback):48 def __init__(self, call=None):49 pj.CallCallback.__init__(self, call)50 # Notification when call state has changed51 def on_state(self):52 global current_call53 print "Call with", self.call.info().remote_uri,54 print "is", self.call.info().state_text,55 print "last code =", self.call.info().last_code, 56 print "(" + self.call.info().last_reason + ")"57 58 if self.call.info().state == pj.CallState.DISCONNECTED:59 current_call = None60 print 'Current call is', current_call61 # Notification when call's media state has changed.62 def on_media_state(self):63 if self.call.info().media_state == pj.MediaState.ACTIVE:64 # Connect the call to sound device65 call_slot = self.call.info().conf_slot66 pj.Lib.instance().conf_connect(call_slot, 0)67 pj.Lib.instance().conf_connect(0, call_slot)68 print "Media is now active"69 else:70 print "Media is inactive"71# Function to make call72def make_call(uri):73 try:74 print "Making call to", uri75 return acc.make_call(uri, cb=MyCallCallback())76 except pj.Error, e:77 print "Exception: " + str(e)78 return None79 80# Create library instance81lib = pj.Lib()82try:83 # Init library with default config and some customized84 # logging config.85 lib.init(log_cfg = pj.LogConfig(level=LOG_LEVEL, callback=log_cb))86 # Create UDP transport which listens to any available port87 transport = lib.create_transport(pj.TransportType.UDP, 88 pj.TransportConfig(0))89 print "\nListening on", transport.info().host, 90 print "port", transport.info().port, "\n"91 92 # Start the library93 lib.start()94 # Create local account95 acc = lib.create_account_for_transport(transport, cb=MyAccountCallback())96 # If argument is specified then make call to the URI97 if len(sys.argv) > 1:98 lck = lib.auto_lock()99 current_call = make_call(sys.argv[1])100 print 'Current call is', current_call101 del lck102 my_sip_uri = "sip:" + transport.info().host + \103 ":" + str(transport.info().port)104 # Menu loop105 while True:106 print "My SIP URI is", my_sip_uri107 print "Menu: m=make call, h=hangup call, a=answer call, q=quit"108 input = sys.stdin.readline().rstrip("\r\n")109 if input == "m":110 if current_call:111 print "Already have another call"112 continue113 print "Enter destination URI to call: ", 114 input = sys.stdin.readline().rstrip("\r\n")115 if input == "":116 continue117 lck = lib.auto_lock()118 current_call = make_call(input)119 del lck120 elif input == "h":121 if not current_call:122 print "There is no call"123 continue124 current_call.hangup()125 elif input == "a":126 if not current_call:127 print "There is no call"128 continue129 current_call.answer(200)130 elif input == "q":131 break132 # Shutdown the library...
Using AI Code Generation
1var stf = require('devicefarmer-stf');2var device = stf.device();3device.call("1234567890");4var stf = require('devicefarmer-stf');5var device = stf.device();6device.call("1234567890");7var stf = require('devicefarmer-stf');8var device = stf.device();9device.call("1234567890");10var stf = require('devicefarmer-stf');11var device = stf.device();12device.call("1234567890");13var stf = require('devicefarmer-stf');14var device = stf.device();15device.call("1234567890");16var stf = require('devicefarmer-stf');17var device = stf.device();18device.call("1234567890");19var stf = require('devicefarmer-stf');20var device = stf.device();21device.call("1234567890");22var stf = require('devicefarmer-stf');23var device = stf.device();24device.call("1234567890");25var stf = require('devicefarmer-stf');26var device = stf.device();27device.call("1234567890");28var stf = require('devicefarmer-stf');29var device = stf.device();30device.call("1234567890");31var stf = require('devicefarmer-stf');
Using AI Code Generation
1var stf = require('devicefarmer-stf-client');2var client = stf.createClient({3});4var device = stf.getDevice({5});6device.getDevice(function(err, device) {7 if (err) {8 console.log('Error getting device info', err);9 } else {10 console.log('Device info', device);11 }12});13device.getDeviceState(function(err, device) {14 if (err) {15 console.log('Error getting device state', err);16 } else {17 console.log('Device state', device);18 }19});20device.getDeviceIdentity(function(err, device) {21 if (err) {22 console.log('Error getting device identity', err);23 } else {24 console.log('Device identity', device);25 }26});27device.getDeviceProperties(function(err, device) {28 if (err) {29 console.log('Error getting device properties', err);30 } else {31 console.log('Device properties', device);32 }33});34device.getDeviceRemoteConnect(function(err, device) {35 if (err) {36 console.log('Error getting device remote connect', err);37 } else {38 console.log('Device remote connect', device);39 }40});41device.getDeviceRemoteConnectUrl(function(err, device) {42 if (err) {43 console.log('Error getting device remote connect url', err);44 } else {45 console.log('Device remote connect url', device);46 }47});48device.getDeviceRemoteConnectUrl(function(err, device) {49 if (err) {50 console.log('Error getting device remote connect url', err);51 } else {52 console.log('Device remote connect url', device);53 }54});55device.getDeviceRemoteConnectUrl(function(err, device) {56 if (err
Using AI Code Generation
1var stf = require('devicefarmer-stf-client');2var device = new stf.Device(stfClient, '4d00d4a3');3device.use(function() {4 return device.call('shell', 'am start -n com.android.vending/com.google.android.finsky.activities.MainActivity');5}).then(function() {6 console.log('done');7});8var stf = require('devicefarmer-stf-client');9var device = new stf.Device(stfClient, '4d00d4a3');10device.use(function() {11 return device.call('shell', 'am start -n com.android.vending/com.google.android.finsky.activities.MainActivity');12}).then(function() {13 console.log('done');14});15var stf = require('devicefarmer-stf-client');16var device = new stf.Device(stfClient, '4d00d4a3');17device.use(function() {18 return device.call('shell', 'am start -n com.android.vending/com.google.android.finsky.activities.MainActivity');19}).then(function() {20 console.log('done');21});22var stf = require('devicefarmer-stf-client');23var device = new stf.Device(stfClient, '4d00d4a3');24device.use(function() {25 return device.call('shell', 'am start -n com.android.vending/com.google.android.finsky.activities.MainActivity');26}).then(function() {27 console.log('done');28});29var stf = require('devicefarmer-stf-client');30var device = new stf.Device(stfClient, '4d00d4a3
Using AI Code Generation
1var Client = require('devicefarmer-stf-client');2var device = client.getDevice('NEXUS5X-0e7f57b0');3device.call({type:'shell',command:'dumpsys battery'}).then(function(result) {4 console.log(result);5});6var Client = require('devicefarmer-stf-client');7var device = client.getDevice('NEXUS5X-0e7f57b0');8device.call({type:'shell',command:'dumpsys battery'}).then(function(result) {9 console.log(result);10});11var Client = require('devicefarmer-stf-client');12var device = client.getDevice('NEXUS5X-0e7f57b0');13device.call({type:'shell',command:'dumpsys battery'}).then(function(result) {14 console.log(result);15});16var Client = require('devicefarmer-stf-client');17var device = client.getDevice('NEXUS5X-0e7f57b0');18device.call({type:'shell',command:'dumpsys battery'}).then(function(result) {19 console.log(result);20});21var Client = require('devicefarmer-stf-client');22var device = client.getDevice('NEXUS5X-0e7f57b0');23device.call({type:'shell',command:'dumpsys battery'}).then(function(result) {24 console.log(result);25});26var Client = require('devicefarmer-stf-client');27var device = client.getDevice('NEXUS5X-0e7f57b0');
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!