How to use _assert_raises method in Testify

Best Python code snippet using Testify_python

test_union.py

Source:test_union.py Github

copy

Full Screen

...524 self.checkScript(fn, (8,))525 def _assert_passes(self, template: str, ann: str, lhs: str):526 code = template.format(ann=ann, lhs=lhs)527 self.checkScript(code, (), name="fn")528 def _assert_raises(self, template: str, ann: str, lhs: str, msg: str):529 code = template.format(ann=ann, lhs=lhs)530 with self.assertRaisesRegex(RuntimeError, msg):531 cu = torch.jit.CompilationUnit(code, _frames_up=1)532 string_frontend = getattr(cu, "fn") # noqa: B009533 def test_union_with_list_assignment(self):534 template = dedent('''535 def fn():536 x: {ann} = {lhs}537 if torch.jit.isinstance(x, List[torch.Tensor]):538 x.append(torch.tensor(3))539 return x540 ''')541 lhs = {"list_literal_empty" : "[]",542 "list_literal_of_tensor" : "[torch.arange(3), torch.arange(5)]",543 "list_literal_of_str" : "[\"foo\", \"bar\", \"baz\"]",544 "list_literal_of_mixed" : "[torch.arange(5), 1]",545 "list_comprehension_of_tensor" :546 "[torch.add(x, 1) for x in [torch.arange(3), torch.arange(5)]]",547 "list_comprehension_of_str" :548 "[x + \"!\" for x in [\"foo\", \"bar\", \"baz\"]]",549 "list_comprehension_of_mixed" :550 "[torch.add(1, x) for x in [torch.arange(5), 1]]"}551 """552 Union[List[str], List[torch.Tensor]]553 """554 self._assert_raises(template,555 "Union[List[str], List[torch.Tensor]]",556 lhs["list_literal_empty"],557 "there are multiple possible List type "558 "candidates in the Union annotation")559 self._assert_passes(template,560 "Union[List[str], List[torch.Tensor]]",561 lhs["list_literal_of_tensor"])562 self._assert_passes(template,563 "Union[List[str], List[torch.Tensor]]",564 lhs["list_literal_of_str"])565 self._assert_raises(template,566 "Union[List[str], List[torch.Tensor]]",567 lhs["list_literal_of_mixed"],568 "none of those types match the types of the"569 " given list elements")570 self._assert_passes(template,571 "Union[List[str], List[torch.Tensor]]",572 lhs["list_comprehension_of_tensor"])573 self._assert_passes(template,574 "Union[List[str], List[torch.Tensor]]",575 lhs["list_comprehension_of_str"])576 # TODO: Support mixed list comprehensions577 self._assert_raises(template,578 "Union[List[str], List[torch.Tensor]]",579 lhs["list_comprehension_of_mixed"],580 "Arguments for call are not valid")581 """582 Union[int, torch.Tensor]583 """584 self._assert_raises(template,585 "Union[int, torch.Tensor]",586 lhs["list_literal_empty"],587 "Expected an Union type annotation with an "588 "inner List type")589 self._assert_raises(template, "Union[int, torch.Tensor]",590 lhs["list_literal_of_tensor"],591 "Expected an Union type annotation with an "592 "inner List type")593 self._assert_raises(template, "Union[int, torch.Tensor]",594 lhs["list_comprehension_of_tensor"],595 "Expected an Union type annotation with an "596 "inner List type")597 """598 Union[List[torch.Tensor], int]599 """600 self._assert_passes(template,601 "Union[List[torch.Tensor], int]",602 lhs["list_literal_empty"])603 self._assert_passes(template,604 "Union[List[torch.Tensor], int]",605 lhs["list_literal_of_tensor"])606 self._assert_raises(template, "Union[List[torch.Tensor], int]",607 lhs["list_literal_of_str"],608 r"List type annotation `List\[Tensor\]` did "609 "not match the types of the given list "610 "elements")611 self._assert_raises(template, "Union[List[torch.Tensor], int]",612 lhs["list_literal_of_mixed"],613 r"List type annotation `List\[Tensor\]` did "614 "not match the types of the given list "615 "elements")616 self._assert_passes(template,617 "Union[List[torch.Tensor], int]",618 lhs["list_comprehension_of_tensor"])619 self._assert_raises(template,620 "Union[List[torch.Tensor], int]",621 lhs["list_comprehension_of_str"],622 r"List type annotation `List\[Tensor\]` did "623 "not match the types of the given list "624 "elements")625 # TODO(@ansley): Support mixed list comprehensions626 self._assert_raises(template,627 "Union[List[torch.Tensor], int]",628 lhs["list_comprehension_of_mixed"],629 "Arguments for call are not valid")630 def test_union_with_dict_assignment(self):631 template = dedent('''632 def fn():633 x: {ann} = {lhs}634 if torch.jit.isinstance(x, Dict[str, torch.Tensor]):635 x["foo"] = torch.tensor(3)636 return x637 ''')638 lhs = {"dict_literal_empty" : "{}",639 "dict_literal_of_str_tensor" :640 "{\"foo\" : torch.arange(3), \"bar\" : torch.arange(5)}",641 "dict_literal_of_str_int" :642 "{\"foo\" : 1, \"bar\" : 2}",643 "dict_literal_of_mixed" :644 "{\"foo\" : torch.arange(3), \"bar\" : 2}",645 "dict_comprehension_of_str_tensor" :646 "{x : torch.add(y, 1) for x, y in \647 zip([\"foo\", \"bar\"], [torch.arange(3), torch.arange(5)])}",648 "dict_comprehension_of_str_int" :649 "{x : torch.add(y, 1) for x, y in \650 zip([\"foo\", \"bar\"], [1, 2]}",651 "dict_comprehension_of_mixed" :652 "{x : torch.add(y, 1) for x, y in \653 zip([\"foo\", \"bar\"], [torch.arange(3), 2])}",654 "dict_keyword" :655 "dict(foo=torch.arange(3), baz=torch.arange(5))",656 "dict_keyword_with_iterable" :657 "dict([(\"foo\", torch.arange(3)), (\"bar\", torch.arange(5))])",658 "dict_keyword_with_empty_iterable" :659 "dict([])",660 "dict_keyword_with_internal_aggregate_function" :661 "dict(zip([\"foo\", \"bar\"], [torch.arange(3), torch.arange(5)])",662 "dict_keyword_with_mapping" :663 "dict({\"foo\" : torch.arange(3), \"bar\" : torch.arange(5)})",664 "dict_keyword_with_mapping_and_kwargs" :665 "dict({\"foo\" : torch.arange(3), \"bar\" : torch.arange(5)}, baz=torch.arange(7))",666 }667 """668 Union[Dict[str, torch.Tensor], Dict[str, int]]669 """670 self._assert_raises(template,671 "Union[List[str], List[torch.Tensor]]",672 lhs["dict_literal_empty"],673 "Expected an Union type annotation with an "674 "inner Dict type")675 self._assert_passes(template,676 "Union[Dict[str, torch.Tensor], Dict[str, int]]",677 lhs["dict_literal_of_str_tensor"])678 self._assert_passes(template,679 "Union[Dict[str, torch.Tensor], Dict[str, int]]",680 lhs["dict_literal_of_str_int"])681 self._assert_raises(template, "Union[Dict[str, torch.Tensor], Dict[str, int]]",682 lhs["dict_literal_of_mixed"],683 "none of those dict types can hold the "684 "types of the given keys and values")685 # TODO: String frontend does not support tuple unpacking686 # https://github.com/pytorch/pytorch/issues/64096687 # self._assert_passes(template, "Union[Dict[str, torch.Tensor], Dict[str, int]]",688 # lhs["dict_comprehension_of_str_tensor"])689 # self._assert_passes(template, "Union[Dict[str, torch.Tensor], Dict[str, int]]",690 # lhs["dict_comprehension_of_str_int"])691 # self._assert_raises(template, "Union[Dict[str, torch.Tensor], Dict[str, int]]",692 # lhs["dict_comprehension_of_mixed"],693 # "foobar")694 # self._assert_passes(template,695 # "Union[Dict[str, torch.Tensor], Dict[str, int]]",696 # lhs["dict_keyword_with_internal_aggregate_function"])697 # TODO(@ansley): Follow-up project needed for full type698 # inference with dict keyword (supported for dict comprehension699 # and dict literal already; should not be a blocker for anyone)700 self._assert_raises(template,701 "Union[Dict[str, torch.Tensor], Dict[str, int]]",702 lhs["dict_keyword"],703 "full type inference is not yet supported")704 self._assert_raises(template,705 "Union[Dict[str, torch.Tensor], Dict[str, int]]",706 lhs["dict_keyword_with_iterable"],707 "full type inference is not yet supported")708 self._assert_raises(template,709 "Union[Dict[str, torch.Tensor], Dict[str, int]]",710 lhs["dict_keyword_with_empty_iterable"],711 "full type inference is not yet supported")712 self._assert_raises(template,713 "Union[Dict[str, torch.Tensor], Dict[str, int]]",714 lhs["dict_keyword_with_mapping"],715 "full type inference is not yet supported")716 self._assert_raises(template,717 "Union[Dict[str, torch.Tensor], Dict[str, int]]",718 lhs["dict_keyword_with_mapping_and_kwargs"],719 "full type inference is not yet supported")720 """721 Union[int, torch.Tensor]722 """723 self._assert_raises(template,724 "Union[int, torch.Tensor]",725 lhs["dict_literal_empty"],726 "Expected an Union type annotation with "727 "an inner Dict type")728 self._assert_raises(template,729 "Union[int, torch.Tensor]",730 lhs["dict_literal_of_str_tensor"],731 "Expected an Union type annotation with "732 "an inner Dict type")733 # See above--string frontend does not support tuple unpacking734 # self._assert_raises(template, "Union[int, torch.Tensor]",735 # lhs["dict_comprehension_of_tensor"],736 # "foobar")737 """738 Union[Dict[str, torch.Tensor], int]739 """740 self._assert_passes(template,741 "Union[Dict[str, torch.Tensor], int]",742 lhs["dict_literal_empty"])743 self._assert_passes(template,744 "Union[Dict[str, torch.Tensor], int]",745 lhs["dict_literal_of_str_tensor"])746 self._assert_raises(template,747 "Union[Dict[str, torch.Tensor], int]",748 lhs["dict_literal_of_str_int"],749 "Type annotation was inferred to be "750 r"`Dict\[str, Tensor\]`, but the type of "751 "values given by the dict literal is")752 self._assert_raises(template,753 "Union[Dict[str, torch.Tensor], int]",754 lhs["dict_literal_of_mixed"],755 "Type annotation was inferred to be "756 r"`Dict\[str, Tensor\]`, but the type of "757 "values given by the dict literal is")758 self._assert_passes(template,759 "Union[Dict[str, torch.Tensor], int]",760 lhs["dict_keyword"])761 self._assert_passes(template,762 "Union[Dict[str, torch.Tensor], int]",763 lhs["dict_keyword_with_iterable"])764 self._assert_passes(template,765 "Union[Dict[str, torch.Tensor], int]",766 lhs["dict_keyword_with_empty_iterable"])767 self._assert_passes(template,768 "Union[Dict[str, torch.Tensor], int]",769 lhs["dict_keyword_with_mapping"])770 self._assert_passes(template,771 "Union[Dict[str, torch.Tensor], int]",772 lhs["dict_keyword_with_mapping_and_kwargs"])773 # See above--string frontend does not support tuple unpacking774 # self._assert_passes(template,775 # "Union[Dict[str, torch.Tensor], int]",776 # lhs["dict_keyword_with_internal_aggregate_function"])777 #778 # self._assert_passes(template,779 # "Union[Dict[str, torch.Tensor], int]",780 # lhs["dict_comprehension_of_str_tensor"])781 # self._assert_raises(template,782 # "Union[Dict[str, torch.Tensor], int]",783 # lhs["dict_comprehension_of_str_int"],784 # "foobar")785 # self._assert_raises(template,786 # "Union[Dict[str, torch.Tensor], int]",787 # lhs["dict_comprehension_of_mixed"],...

Full Screen

Full Screen

_check_streams.py

Source:_check_streams.py Github

copy

Full Screen

...15 await aclose_forcefully(self._both[0])16 finally:17 await aclose_forcefully(self._both[1])18@contextmanager19def _assert_raises(exc):20 __tracebackhide__ = True21 try:22 yield23 except exc:24 pass25 else:26 raise AssertionError("expected exception: {}".format(exc))27async def check_one_way_stream(stream_maker, clogged_stream_maker):28 """Perform a number of generic tests on a custom one-way stream29 implementation.30 Args:31 stream_maker: An async (!) function which returns a connected32 (:class:`~trio.abc.SendStream`, :class:`~trio.abc.ReceiveStream`)33 pair.34 clogged_stream_maker: Either None, or an async function similar to35 stream_maker, but with the extra property that the returned stream36 is in a state where ``send_all`` and37 ``wait_send_all_might_not_block`` will block until ``receive_some``38 has been called. This allows for more thorough testing of some edge39 cases, especially around ``wait_send_all_might_not_block``.40 Raises:41 AssertionError: if a test fails.42 """43 async with _ForceCloseBoth(await stream_maker()) as (s, r):44 assert isinstance(s, SendStream)45 assert isinstance(r, ReceiveStream)46 async def do_send_all(data):47 with assert_checkpoints():48 assert await s.send_all(data) is None49 async def do_receive_some(*args):50 with assert_checkpoints():51 return await r.receive_some(*args)52 async def checked_receive_1(expected):53 assert await do_receive_some(1) == expected54 async def do_aclose(resource):55 with assert_checkpoints():56 await resource.aclose()57 # Simple sending/receiving58 async with _core.open_nursery() as nursery:59 nursery.start_soon(do_send_all, b"x")60 nursery.start_soon(checked_receive_1, b"x")61 async def send_empty_then_y():62 # Streams should tolerate sending b"" without giving it any63 # special meaning.64 await do_send_all(b"")65 await do_send_all(b"y")66 async with _core.open_nursery() as nursery:67 nursery.start_soon(send_empty_then_y)68 nursery.start_soon(checked_receive_1, b"y")69 # ---- Checking various argument types ----70 # send_all accepts bytearray and memoryview71 async with _core.open_nursery() as nursery:72 nursery.start_soon(do_send_all, bytearray(b"1"))73 nursery.start_soon(checked_receive_1, b"1")74 async with _core.open_nursery() as nursery:75 nursery.start_soon(do_send_all, memoryview(b"2"))76 nursery.start_soon(checked_receive_1, b"2")77 # max_bytes must be a positive integer78 with _assert_raises(ValueError):79 await r.receive_some(-1)80 with _assert_raises(ValueError):81 await r.receive_some(0)82 with _assert_raises(TypeError):83 await r.receive_some(1.5)84 # it can also be missing or None85 async with _core.open_nursery() as nursery:86 nursery.start_soon(do_send_all, b"x")87 assert await do_receive_some() == b"x"88 async with _core.open_nursery() as nursery:89 nursery.start_soon(do_send_all, b"x")90 assert await do_receive_some(None) == b"x"91 with _assert_raises(_core.BusyResourceError):92 async with _core.open_nursery() as nursery:93 nursery.start_soon(do_receive_some, 1)94 nursery.start_soon(do_receive_some, 1)95 # Method always has to exist, and an empty stream with a blocked96 # receive_some should *always* allow send_all. (Technically it's legal97 # for send_all to wait until receive_some is called to run, though; a98 # stream doesn't *have* to have any internal buffering. That's why we99 # start a concurrent receive_some call, then cancel it.)100 async def simple_check_wait_send_all_might_not_block(scope):101 with assert_checkpoints():102 await s.wait_send_all_might_not_block()103 scope.cancel()104 async with _core.open_nursery() as nursery:105 nursery.start_soon(106 simple_check_wait_send_all_might_not_block, nursery.cancel_scope107 )108 nursery.start_soon(do_receive_some, 1)109 # closing the r side leads to BrokenResourceError on the s side110 # (eventually)111 async def expect_broken_stream_on_send():112 with _assert_raises(_core.BrokenResourceError):113 while True:114 await do_send_all(b"x" * 100)115 async with _core.open_nursery() as nursery:116 nursery.start_soon(expect_broken_stream_on_send)117 nursery.start_soon(do_aclose, r)118 # once detected, the stream stays broken119 with _assert_raises(_core.BrokenResourceError):120 await do_send_all(b"x" * 100)121 # r closed -> ClosedResourceError on the receive side122 with _assert_raises(_core.ClosedResourceError):123 await do_receive_some(4096)124 # we can close the same stream repeatedly, it's fine125 await do_aclose(r)126 await do_aclose(r)127 # closing the sender side128 await do_aclose(s)129 # now trying to send raises ClosedResourceError130 with _assert_raises(_core.ClosedResourceError):131 await do_send_all(b"x" * 100)132 # even if it's an empty send133 with _assert_raises(_core.ClosedResourceError):134 await do_send_all(b"")135 # ditto for wait_send_all_might_not_block136 with _assert_raises(_core.ClosedResourceError):137 with assert_checkpoints():138 await s.wait_send_all_might_not_block()139 # and again, repeated closing is fine140 await do_aclose(s)141 await do_aclose(s)142 async with _ForceCloseBoth(await stream_maker()) as (s, r):143 # if send-then-graceful-close, receiver gets data then b""144 async def send_then_close():145 await do_send_all(b"y")146 await do_aclose(s)147 async def receive_send_then_close():148 # We want to make sure that if the sender closes the stream before149 # we read anything, then we still get all the data. But some150 # streams might block on the do_send_all call. So we let the151 # sender get as far as it can, then we receive.152 await _core.wait_all_tasks_blocked()153 await checked_receive_1(b"y")154 await checked_receive_1(b"")155 await do_aclose(r)156 async with _core.open_nursery() as nursery:157 nursery.start_soon(send_then_close)158 nursery.start_soon(receive_send_then_close)159 async with _ForceCloseBoth(await stream_maker()) as (s, r):160 await aclose_forcefully(r)161 with _assert_raises(_core.BrokenResourceError):162 while True:163 await do_send_all(b"x" * 100)164 with _assert_raises(_core.ClosedResourceError):165 await do_receive_some(4096)166 async with _ForceCloseBoth(await stream_maker()) as (s, r):167 await aclose_forcefully(s)168 with _assert_raises(_core.ClosedResourceError):169 await do_send_all(b"123")170 # after the sender does a forceful close, the receiver might either171 # get BrokenResourceError or a clean b""; either is OK. Not OK would be172 # if it freezes, or returns data.173 try:174 await checked_receive_1(b"")175 except _core.BrokenResourceError:176 pass177 # cancelled aclose still closes178 async with _ForceCloseBoth(await stream_maker()) as (s, r):179 with _core.CancelScope() as scope:180 scope.cancel()181 await r.aclose()182 with _core.CancelScope() as scope:183 scope.cancel()184 await s.aclose()185 with _assert_raises(_core.ClosedResourceError):186 await do_send_all(b"123")187 with _assert_raises(_core.ClosedResourceError):188 await do_receive_some(4096)189 # Check that we can still gracefully close a stream after an operation has190 # been cancelled. This can be challenging if cancellation can leave the191 # stream internals in an inconsistent state, e.g. for192 # SSLStream. Unfortunately this test isn't very thorough; the really193 # challenging case for something like SSLStream is it gets cancelled194 # *while* it's sending data on the underlying, not before. But testing195 # that requires some special-case handling of the particular stream setup;196 # we can't do it here. Maybe we could do a bit better with197 # https://github.com/python-trio/trio/issues/77198 async with _ForceCloseBoth(await stream_maker()) as (s, r):199 async def expect_cancelled(afn, *args):200 with _assert_raises(_core.Cancelled):201 await afn(*args)202 with _core.CancelScope() as scope:203 scope.cancel()204 async with _core.open_nursery() as nursery:205 nursery.start_soon(expect_cancelled, do_send_all, b"x")206 nursery.start_soon(expect_cancelled, do_receive_some, 1)207 async with _core.open_nursery() as nursery:208 nursery.start_soon(do_aclose, s)209 nursery.start_soon(do_aclose, r)210 # Check that if a task is blocked in receive_some, then closing the211 # receive stream causes it to wake up.212 async with _ForceCloseBoth(await stream_maker()) as (s, r):213 async def receive_expecting_closed():214 with _assert_raises(_core.ClosedResourceError):215 await r.receive_some(10)216 async with _core.open_nursery() as nursery:217 nursery.start_soon(receive_expecting_closed)218 await _core.wait_all_tasks_blocked()219 await aclose_forcefully(r)220 # check wait_send_all_might_not_block, if we can221 if clogged_stream_maker is not None:222 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):223 record = []224 async def waiter(cancel_scope):225 record.append("waiter sleeping")226 with assert_checkpoints():227 await s.wait_send_all_might_not_block()228 record.append("waiter wokeup")229 cancel_scope.cancel()230 async def receiver():231 # give wait_send_all_might_not_block a chance to block232 await _core.wait_all_tasks_blocked()233 record.append("receiver starting")234 while True:235 await r.receive_some(16834)236 async with _core.open_nursery() as nursery:237 nursery.start_soon(waiter, nursery.cancel_scope)238 await _core.wait_all_tasks_blocked()239 nursery.start_soon(receiver)240 assert record == [241 "waiter sleeping",242 "receiver starting",243 "waiter wokeup",244 ]245 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):246 # simultaneous wait_send_all_might_not_block fails247 with _assert_raises(_core.BusyResourceError):248 async with _core.open_nursery() as nursery:249 nursery.start_soon(s.wait_send_all_might_not_block)250 nursery.start_soon(s.wait_send_all_might_not_block)251 # and simultaneous send_all and wait_send_all_might_not_block (NB252 # this test might destroy the stream b/c we end up cancelling253 # send_all and e.g. SSLStream can't handle that, so we have to254 # recreate afterwards)255 with _assert_raises(_core.BusyResourceError):256 async with _core.open_nursery() as nursery:257 nursery.start_soon(s.wait_send_all_might_not_block)258 nursery.start_soon(s.send_all, b"123")259 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):260 # send_all and send_all blocked simultaneously should also raise261 # (but again this might destroy the stream)262 with _assert_raises(_core.BusyResourceError):263 async with _core.open_nursery() as nursery:264 nursery.start_soon(s.send_all, b"123")265 nursery.start_soon(s.send_all, b"123")266 # closing the receiver causes wait_send_all_might_not_block to return,267 # with or without an exception268 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):269 async def sender():270 try:271 with assert_checkpoints():272 await s.wait_send_all_might_not_block()273 except _core.BrokenResourceError: # pragma: no cover274 pass275 async def receiver():276 await _core.wait_all_tasks_blocked()277 await aclose_forcefully(r)278 async with _core.open_nursery() as nursery:279 nursery.start_soon(sender)280 nursery.start_soon(receiver)281 # and again with the call starting after the close282 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):283 await aclose_forcefully(r)284 try:285 with assert_checkpoints():286 await s.wait_send_all_might_not_block()287 except _core.BrokenResourceError: # pragma: no cover288 pass289 # Check that if a task is blocked in a send-side method, then closing290 # the send stream causes it to wake up.291 async def close_soon(s):292 await _core.wait_all_tasks_blocked()293 await aclose_forcefully(s)294 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):295 async with _core.open_nursery() as nursery:296 nursery.start_soon(close_soon, s)297 with _assert_raises(_core.ClosedResourceError):298 await s.send_all(b"xyzzy")299 async with _ForceCloseBoth(await clogged_stream_maker()) as (s, r):300 async with _core.open_nursery() as nursery:301 nursery.start_soon(close_soon, s)302 with _assert_raises(_core.ClosedResourceError):303 await s.wait_send_all_might_not_block()304async def check_two_way_stream(stream_maker, clogged_stream_maker):305 """Perform a number of generic tests on a custom two-way stream306 implementation.307 This is similar to :func:`check_one_way_stream`, except that the maker308 functions are expected to return objects implementing the309 :class:`~trio.abc.Stream` interface.310 This function tests a *superset* of what :func:`check_one_way_stream`311 checks – if you call this, then you don't need to also call312 :func:`check_one_way_stream`.313 """314 await check_one_way_stream(stream_maker, clogged_stream_maker)315 async def flipped_stream_maker():316 return reversed(await stream_maker())317 if clogged_stream_maker is not None:318 async def flipped_clogged_stream_maker():319 return reversed(await clogged_stream_maker())320 else:321 flipped_clogged_stream_maker = None322 await check_one_way_stream(flipped_stream_maker, flipped_clogged_stream_maker)323 async with _ForceCloseBoth(await stream_maker()) as (s1, s2):324 assert isinstance(s1, Stream)325 assert isinstance(s2, Stream)326 # Duplex can be a bit tricky, might as well check it as well327 DUPLEX_TEST_SIZE = 2 ** 20328 CHUNK_SIZE_MAX = 2 ** 14329 r = random.Random(0)330 i = r.getrandbits(8 * DUPLEX_TEST_SIZE)331 test_data = i.to_bytes(DUPLEX_TEST_SIZE, "little")332 async def sender(s, data, seed):333 r = random.Random(seed)334 m = memoryview(data)335 while m:336 chunk_size = r.randint(1, CHUNK_SIZE_MAX)337 await s.send_all(m[:chunk_size])338 m = m[chunk_size:]339 async def receiver(s, data, seed):340 r = random.Random(seed)341 got = bytearray()342 while len(got) < len(data):343 chunk = await s.receive_some(r.randint(1, CHUNK_SIZE_MAX))344 assert chunk345 got += chunk346 assert got == data347 async with _core.open_nursery() as nursery:348 nursery.start_soon(sender, s1, test_data, 0)349 nursery.start_soon(sender, s2, test_data[::-1], 1)350 nursery.start_soon(receiver, s1, test_data[::-1], 2)351 nursery.start_soon(receiver, s2, test_data, 3)352 async def expect_receive_some_empty():353 assert await s2.receive_some(10) == b""354 await s2.aclose()355 async with _core.open_nursery() as nursery:356 nursery.start_soon(expect_receive_some_empty)357 nursery.start_soon(s1.aclose)358async def check_half_closeable_stream(stream_maker, clogged_stream_maker):359 """Perform a number of generic tests on a custom half-closeable stream360 implementation.361 This is similar to :func:`check_two_way_stream`, except that the maker362 functions are expected to return objects that implement the363 :class:`~trio.abc.HalfCloseableStream` interface.364 This function tests a *superset* of what :func:`check_two_way_stream`365 checks – if you call this, then you don't need to also call366 :func:`check_two_way_stream`.367 """368 await check_two_way_stream(stream_maker, clogged_stream_maker)369 async with _ForceCloseBoth(await stream_maker()) as (s1, s2):370 assert isinstance(s1, HalfCloseableStream)371 assert isinstance(s2, HalfCloseableStream)372 async def send_x_then_eof(s):373 await s.send_all(b"x")374 with assert_checkpoints():375 await s.send_eof()376 async def expect_x_then_eof(r):377 await _core.wait_all_tasks_blocked()378 assert await r.receive_some(10) == b"x"379 assert await r.receive_some(10) == b""380 async with _core.open_nursery() as nursery:381 nursery.start_soon(send_x_then_eof, s1)382 nursery.start_soon(expect_x_then_eof, s2)383 # now sending is disallowed384 with _assert_raises(_core.ClosedResourceError):385 await s1.send_all(b"y")386 # but we can do send_eof again387 with assert_checkpoints():388 await s1.send_eof()389 # and we can still send stuff back the other way390 async with _core.open_nursery() as nursery:391 nursery.start_soon(send_x_then_eof, s2)392 nursery.start_soon(expect_x_then_eof, s1)393 if clogged_stream_maker is not None:394 async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2):395 # send_all and send_eof simultaneously is not ok396 with _assert_raises(_core.BusyResourceError):397 async with _core.open_nursery() as nursery:398 nursery.start_soon(s1.send_all, b"x")399 await _core.wait_all_tasks_blocked()400 nursery.start_soon(s1.send_eof)401 async with _ForceCloseBoth(await clogged_stream_maker()) as (s1, s2):402 # wait_send_all_might_not_block and send_eof simultaneously is not403 # ok either404 with _assert_raises(_core.BusyResourceError):405 async with _core.open_nursery() as nursery:406 nursery.start_soon(s1.wait_send_all_might_not_block)407 await _core.wait_all_tasks_blocked()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful