How to use _close_transport method in autotest

Best Python code snippet using autotest_python

test_pool.py

Source:test_pool.py Github

copy

Full Screen

1import asyncio2import sys3from asyncio import Future4from concurrent.futures.thread import ThreadPoolExecutor5from concurrent.futures import wait6from functools import partial7from threading import Thread8from unittest.mock import MagicMock9import pytest10from lightbus import DebugEventTransport, RedisEventTransport, EventTransport, EventMessage11from lightbus.config import Config12from lightbus.exceptions import (13 CannotShrinkEmptyPool,14 CannotProxyPrivateMethod,15 CannotProxySynchronousMethod,16 CannotProxyProperty,17)18from lightbus.transports.pool import TransportPool19pytestmark = pytest.mark.unit20@pytest.fixture()21async def dummy_pool():22 pool = TransportPool(23 transport_class=DebugEventTransport,24 transport_config=DebugEventTransport.Config(),25 config=Config.default(),26 )27 yield pool28 await pool.close()29@pytest.fixture()30async def redis_pool(redis_server_url):31 pool = TransportPool(32 transport_class=RedisEventTransport,33 transport_config=RedisEventTransport.Config(url=redis_server_url),34 config=Config.default(),35 )36 yield pool37 await pool.close()38@pytest.fixture()39async def attr_test_pool(redis_server_url):40 """Used for testing attribute access only"""41 class AttrTestEventTransport(EventTransport):42 class_prop = True43 async def async_method(self):44 return True45 def sync_method(self):46 return True47 async def _async_private_method(self):48 return True49 pool = TransportPool(50 transport_class=AttrTestEventTransport,51 transport_config=AttrTestEventTransport.Config(),52 config=Config.default(),53 )54 yield pool55 await pool.close()56@pytest.fixture()57def run_in_many_threads():58 def run_in_many_threads_(async_fn, max_workers=50, executions=200, *args, **kwargs):59 with ThreadPoolExecutor(max_workers=max_workers) as e:60 futures = []61 for _ in range(0, executions):62 futures.append(e.submit(partial(asyncio.run, async_fn(*args, **kwargs))))63 done, _ = wait(futures)64 for future in done:65 if future.exception():66 raise future.exception()67 return done68 return run_in_many_threads_69@pytest.mark.asyncio70def test_equal(dummy_pool):71 """Test the __eq__ method"""72 assert dummy_pool == TransportPool(73 transport_class=DebugEventTransport,74 transport_config=DebugEventTransport.Config(),75 config=Config.default(),76 )77@pytest.mark.asyncio78def test_not_equal(redis_pool, redis_server_url):79 """Test the __eq__ method"""80 assert redis_pool != TransportPool(81 transport_class=RedisEventTransport,82 transport_config=RedisEventTransport.Config(url=redis_server_url, service_name="123"),83 config=Config.default(),84 )85@pytest.mark.asyncio86def test_hash_equal(dummy_pool):87 """Test the __hash__ method"""88 assert hash(dummy_pool) == hash(89 TransportPool(90 transport_class=DebugEventTransport,91 transport_config=DebugEventTransport.Config(),92 config=Config.default(),93 )94 )95@pytest.mark.asyncio96def test_hash_not_equal(redis_pool, redis_server_url):97 """Test the __hash__ method"""98 assert hash(redis_pool) != hash(99 TransportPool(100 transport_class=RedisEventTransport,101 transport_config=RedisEventTransport.Config(url=redis_server_url, service_name="123"),102 config=Config.default(),103 )104 )105@pytest.mark.asyncio106async def test_grow(dummy_pool: TransportPool):107 assert dummy_pool.free == 0108 assert dummy_pool.in_use == 0109 await dummy_pool.grow()110 assert dummy_pool.free == 1111 assert dummy_pool.in_use == 0112 await dummy_pool.grow()113 assert dummy_pool.free == 2114 assert dummy_pool.in_use == 0115@pytest.mark.asyncio116def test_grow_threaded(redis_pool: TransportPool, run_in_many_threads):117 run_in_many_threads(redis_pool.grow, executions=200)118 assert redis_pool.free == 200119 assert redis_pool.in_use == 0120@pytest.mark.asyncio121async def test_shrink(dummy_pool: TransportPool):122 await dummy_pool.grow()123 await dummy_pool.grow()124 await dummy_pool.shrink()125 await dummy_pool.shrink()126 assert dummy_pool.free == 0127@pytest.mark.asyncio128async def test_shrink_threaded(redis_pool: TransportPool, run_in_many_threads):129 for _ in range(0, 200):130 await redis_pool.grow()131 assert redis_pool.free == 200132 run_in_many_threads(redis_pool.shrink, executions=200)133 assert redis_pool.free == 0134 assert redis_pool.in_use == 0135@pytest.mark.asyncio136async def test_shrink_when_empty(dummy_pool: TransportPool):137 with pytest.raises(CannotShrinkEmptyPool):138 await dummy_pool.shrink()139@pytest.mark.asyncio140async def test_checkout_checkin(dummy_pool: TransportPool):141 transport = await dummy_pool.checkout()142 assert isinstance(transport, DebugEventTransport)143 assert dummy_pool.free == 0144 assert dummy_pool.in_use == 1145 await dummy_pool.checkin(transport)146 assert dummy_pool.free == 1147 assert dummy_pool.in_use == 0148@pytest.mark.asyncio149async def test_checking_to_closed_transport(mocker, redis_pool: TransportPool):150 transport = await redis_pool.checkout()151 mocker.spy(transport, "close")152 # Close the pool153 await redis_pool.close()154 # Should work even though pool is closed155 await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})156 # Check the transport into the closed pool157 await redis_pool.checkin(transport)158 # The transport has been closed by the pool159 assert transport.close.called160@pytest.mark.asyncio161async def test_checkout_checkin_threaded(162 mocker, redis_pool: TransportPool, run_in_many_threads, get_total_redis_connections163):164 """Check in/out many connections concurrently (using threads)165 Note that this will not grow the pool. See the doc string for TransportPool166 """167 mocker.spy(redis_pool, "grow")168 mocker.spy(redis_pool, "_create_transport")169 mocker.spy(redis_pool, "_close_transport")170 async def _check_in_out():171 transport = await redis_pool.checkout()172 # Ensure we do something here in order to slow down the execution173 # time, thereby ensuring our pool starts to fill up. We also need to174 # use the connection to ensure the connection is lazy loaded175 await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})176 await asyncio.sleep(0.02)177 await redis_pool.checkin(transport)178 run_in_many_threads(_check_in_out, executions=500, max_workers=20)179 # We're running in a thread, so we never grow the pool. Instead180 # we just return one-off connections which will be closed181 # when they get checked back in182 assert not redis_pool.grow.called183 assert redis_pool._create_transport.call_count == 500184 assert redis_pool._close_transport.call_count == 500185 assert await get_total_redis_connections() == 1186@pytest.mark.asyncio187async def test_checkout_checkin_asyncio(188 mocker, redis_pool: TransportPool, get_total_redis_connections189):190 """Check in/out many connections concurrently (using asyncio tasks)191 Unlike using threads, this should grow the pool192 """193 # mocker.spy in pytest-mock runs afoul of this change in 3.8.1194 # https://bugs.python.org/issue38857195 # We therefore use mocker.spy for python 3.7, or the new AsyncMock in 3.8196 # See: https://github.com/pytest-dev/pytest-mock/issues/178197 if sys.version_info >= (3, 8):198 from unittest.mock import AsyncMock199 redis_pool.grow = AsyncMock(wraps=redis_pool.grow)200 redis_pool._create_transport = AsyncMock(wraps=redis_pool._create_transport)201 redis_pool._close_transport = AsyncMock(wraps=redis_pool._close_transport)202 else:203 mocker.spy(redis_pool, "grow")204 mocker.spy(redis_pool, "_create_transport")205 mocker.spy(redis_pool, "_close_transport")206 async def _check_in_out():207 transport = await redis_pool.checkout()208 # Ensure we do something here in order to slow down the execution209 # time, thereby ensuring our pool starts to fill up. We also need to210 # use the connection to ensure the connection is lazy loaded211 await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})212 await asyncio.sleep(0.02)213 await redis_pool.checkin(transport)214 async def _check_in_out_loop():215 for _ in range(0, 500 // 20):216 await _check_in_out()217 tasks = [asyncio.create_task(_check_in_out_loop()) for _ in range(0, 20)]218 await asyncio.wait(tasks)219 await redis_pool.close()220 assert redis_pool.grow.call_count == 20221 assert redis_pool._create_transport.call_count == 20222 assert redis_pool._close_transport.call_count == 20223 assert await get_total_redis_connections() == 1224@pytest.mark.asyncio225async def test_context(dummy_pool: TransportPool):226 async with dummy_pool as transport:227 assert isinstance(transport, DebugEventTransport)228 assert dummy_pool.free == 1229 assert dummy_pool.in_use == 0230@pytest.mark.asyncio231async def test_close(dummy_pool: TransportPool):232 await dummy_pool.grow()233 transport = dummy_pool.pool[0]234 # Fancy footwork to mock the close coroutine235 # (because it is async, not sync)236 f = Future()237 f.set_result(None)238 transport.close = MagicMock(return_value=f)239 await dummy_pool.close()240 assert transport.close.called241 assert dummy_pool.free == 0242 assert dummy_pool.in_use == 0243@pytest.mark.asyncio244async def test_proxy_attr_async(attr_test_pool: TransportPool):245 # Passes method call through to underlying transport246 result = await attr_test_pool.async_method()247 assert result248 # Transport was created, used, and is now freed up249 assert attr_test_pool.free == 1250 assert attr_test_pool.in_use == 0251@pytest.mark.asyncio252async def test_proxy_sync(attr_test_pool: TransportPool):253 # Will raise errors as pass-through is only available for async public methods254 with pytest.raises(CannotProxySynchronousMethod):255 await attr_test_pool.sync_method()256 assert attr_test_pool.free == 0257 assert attr_test_pool.in_use == 0258@pytest.mark.asyncio259async def test_proxy_async_private(attr_test_pool: TransportPool):260 # Will raise errors as pass-through is only available for async public methods261 with pytest.raises(CannotProxyPrivateMethod):262 await attr_test_pool._async_private_method()263 assert attr_test_pool.free == 0264 assert attr_test_pool.in_use == 0265@pytest.mark.asyncio266async def test_proxy_property(attr_test_pool: TransportPool):267 # Will raise errors as pass-through is only available for async public methods268 with pytest.raises(CannotProxyProperty):269 _ = attr_test_pool.class_prop270 assert attr_test_pool.free == 0271 assert attr_test_pool.in_use == 0272@pytest.mark.asyncio273async def test_attr_proxy_not_found(attr_test_pool: TransportPool):274 with pytest.raises(AttributeError):275 _ = attr_test_pool.foo276 assert attr_test_pool.free == 0277 assert attr_test_pool.in_use == 0278@pytest.mark.asyncio279async def test_close_with_transports_from_child_thread(redis_pool: TransportPool):280 """Test that the pool can be closed when it contains transports created within a child thread"""281 flag = False282 async def thread():283 nonlocal flag284 async with redis_pool as transport:285 # Do something to ensure a connection gets made286 async for _ in transport.history("foo", "bar"):287 pass288 flag = True289 fn = partial(asyncio.run, thread())290 t = Thread(target=fn)291 t.start()292 t.join()293 # Ensure we actually went and connected to redis294 assert flag295 await redis_pool.close()296 assert redis_pool.in_use == 0...

Full Screen

Full Screen

pool.py

Source:pool.py Github

copy

Full Screen

...68 except IndexError:69 raise CannotShrinkEmptyPool(70 "Transport pool is already empty, cannot shrink it further"71 )72 await self._close_transport(old_transport)73 async def checkout(self) -> VT:74 if self.closed:75 raise TransportPoolIsClosed("Cannot get a connection, transport pool is closed")76 if threading.current_thread() != self.home_thread:77 return await self._create_transport()78 else:79 with self.lock:80 if not self.pool:81 await self.grow()82 transport = self.pool.pop(0)83 self.checked_out.add(transport)84 return transport85 async def checkin(self, transport: VT):86 if threading.current_thread() != self.home_thread:87 return await self._close_transport(transport)88 else:89 with self.lock:90 self.checked_out.discard(transport)91 self.pool.append(transport)92 if self.closed:93 await self._close_all()94 @property95 def free(self) -> int:96 return len(self.pool)97 @property98 def in_use(self) -> int:99 return len(self.checked_out)100 @property101 def total(self) -> int:102 return self.free + self.in_use103 async def __aenter__(self) -> VT:104 transport = await self.checkout()105 self.context_stack.append(transport)106 return transport107 async def __aexit__(self, exc_type, exc_val, exc_tb):108 transport = self.context_stack.pop()109 await self.checkin(transport)110 async def close(self):111 with self.lock:112 self.closed = True113 await self._close_all()114 async def _close_all(self):115 with self.lock:116 while self.pool:117 await self._close_transport(self.pool.pop())118 def _instantiate_transport(self) -> VT:119 """Instantiate a transport without opening it"""120 return self.transport_class.from_config(121 config=self.config, **self.transport_config._asdict()122 )123 async def _create_transport(self) -> VT:124 """Return an opened transport"""125 new_transport = self._instantiate_transport()126 await new_transport.open()127 return new_transport128 async def _close_transport(self, transport: VT):129 """Close a specific transport"""130 await transport.close()131 def __getattr__(self, item):132 async def fn_pool_wrapper(*args, **kwargs):133 async with self as transport:134 return await getattr(transport, item)(*args, **kwargs)135 async def gen_pool_wrapper(*args, **kwargs):136 async with self as transport:137 async for value in getattr(transport, item)(*args, **kwargs):138 yield value139 attr = getattr(self.transport_class, item, None)140 if not attr:141 raise AttributeError(142 f"Neither the transport pool {repr(self)} nor the transport with class "...

Full Screen

Full Screen

binarysocket.py

Source:binarysocket.py Github

copy

Full Screen

...7from thrift.protocol import TBinaryProtocol8from commonutil_net_thrift.client.types import ClientSetup9from commonutil_net_thrift.connector.exceptions import ClientCheckException10_log = logging.getLogger(__name__)11def _close_transport(transport):12 try:13 transport.close()14 except Exception:15 pass16def binary_socket_connector(client_setup: ClientSetup, host: str, port: int, timeout_seconds: float = 3):17 """18 Connector callable for making connection with Binary Protocol over Socket.19 """20 transport = TSocket.TSocket(host, port)21 if timeout_seconds:22 transport.setTimeout(timeout_seconds * 1000.0)23 protocol = TBinaryProtocol.TBinaryProtocol(transport)24 client = client_setup.client_class(protocol)25 transport.open() # connect26 if client_setup.check_callable:27 try:28 if not client_setup.check_callable(client):29 _log.error("failed on check connected client: host=%r, port=%r", host, port)30 raise ClientCheckException()31 except Exception as e:32 _close_transport(transport)33 _log.exception("caught exception on check connected client: host=%r, port=%r, exception=%r", host, port, e)34 raise35 if client_setup.prepare_callable:36 try:37 client_setup.prepare_callable(client)38 except Exception as e:39 _close_transport(transport)40 _log.exception("caught exception on preparing connected client for use: host=%r, port=%r, exception=%r", host, port, e)41 raise...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful