Best Python code snippet using autotest_python
test_pool.py
Source:test_pool.py  
1import asyncio2import sys3from asyncio import Future4from concurrent.futures.thread import ThreadPoolExecutor5from concurrent.futures import wait6from functools import partial7from threading import Thread8from unittest.mock import MagicMock9import pytest10from lightbus import DebugEventTransport, RedisEventTransport, EventTransport, EventMessage11from lightbus.config import Config12from lightbus.exceptions import (13    CannotShrinkEmptyPool,14    CannotProxyPrivateMethod,15    CannotProxySynchronousMethod,16    CannotProxyProperty,17)18from lightbus.transports.pool import TransportPool19pytestmark = pytest.mark.unit20@pytest.fixture()21async def dummy_pool():22    pool = TransportPool(23        transport_class=DebugEventTransport,24        transport_config=DebugEventTransport.Config(),25        config=Config.default(),26    )27    yield pool28    await pool.close()29@pytest.fixture()30async def redis_pool(redis_server_url):31    pool = TransportPool(32        transport_class=RedisEventTransport,33        transport_config=RedisEventTransport.Config(url=redis_server_url),34        config=Config.default(),35    )36    yield pool37    await pool.close()38@pytest.fixture()39async def attr_test_pool(redis_server_url):40    """Used for testing attribute access only"""41    class AttrTestEventTransport(EventTransport):42        class_prop = True43        async def async_method(self):44            return True45        def sync_method(self):46            return True47        async def _async_private_method(self):48            return True49    pool = TransportPool(50        transport_class=AttrTestEventTransport,51        transport_config=AttrTestEventTransport.Config(),52        config=Config.default(),53    )54    yield pool55    await pool.close()56@pytest.fixture()57def run_in_many_threads():58    def run_in_many_threads_(async_fn, max_workers=50, executions=200, *args, **kwargs):59        with ThreadPoolExecutor(max_workers=max_workers) as e:60            futures = []61            for _ in range(0, executions):62                futures.append(e.submit(partial(asyncio.run, async_fn(*args, **kwargs))))63            done, _ = wait(futures)64        for future in done:65            if future.exception():66                raise future.exception()67        return done68    return run_in_many_threads_69@pytest.mark.asyncio70def test_equal(dummy_pool):71    """Test the __eq__ method"""72    assert dummy_pool == TransportPool(73        transport_class=DebugEventTransport,74        transport_config=DebugEventTransport.Config(),75        config=Config.default(),76    )77@pytest.mark.asyncio78def test_not_equal(redis_pool, redis_server_url):79    """Test the __eq__ method"""80    assert redis_pool != TransportPool(81        transport_class=RedisEventTransport,82        transport_config=RedisEventTransport.Config(url=redis_server_url, service_name="123"),83        config=Config.default(),84    )85@pytest.mark.asyncio86def test_hash_equal(dummy_pool):87    """Test the __hash__ method"""88    assert hash(dummy_pool) == hash(89        TransportPool(90            transport_class=DebugEventTransport,91            transport_config=DebugEventTransport.Config(),92            config=Config.default(),93        )94    )95@pytest.mark.asyncio96def test_hash_not_equal(redis_pool, redis_server_url):97    """Test the __hash__ method"""98    assert hash(redis_pool) != hash(99        TransportPool(100            transport_class=RedisEventTransport,101            transport_config=RedisEventTransport.Config(url=redis_server_url, service_name="123"),102            config=Config.default(),103        )104    )105@pytest.mark.asyncio106async def test_grow(dummy_pool: TransportPool):107    assert dummy_pool.free == 0108    assert dummy_pool.in_use == 0109    await dummy_pool.grow()110    assert dummy_pool.free == 1111    assert dummy_pool.in_use == 0112    await dummy_pool.grow()113    assert dummy_pool.free == 2114    assert dummy_pool.in_use == 0115@pytest.mark.asyncio116def test_grow_threaded(redis_pool: TransportPool, run_in_many_threads):117    run_in_many_threads(redis_pool.grow, executions=200)118    assert redis_pool.free == 200119    assert redis_pool.in_use == 0120@pytest.mark.asyncio121async def test_shrink(dummy_pool: TransportPool):122    await dummy_pool.grow()123    await dummy_pool.grow()124    await dummy_pool.shrink()125    await dummy_pool.shrink()126    assert dummy_pool.free == 0127@pytest.mark.asyncio128async def test_shrink_threaded(redis_pool: TransportPool, run_in_many_threads):129    for _ in range(0, 200):130        await redis_pool.grow()131    assert redis_pool.free == 200132    run_in_many_threads(redis_pool.shrink, executions=200)133    assert redis_pool.free == 0134    assert redis_pool.in_use == 0135@pytest.mark.asyncio136async def test_shrink_when_empty(dummy_pool: TransportPool):137    with pytest.raises(CannotShrinkEmptyPool):138        await dummy_pool.shrink()139@pytest.mark.asyncio140async def test_checkout_checkin(dummy_pool: TransportPool):141    transport = await dummy_pool.checkout()142    assert isinstance(transport, DebugEventTransport)143    assert dummy_pool.free == 0144    assert dummy_pool.in_use == 1145    await dummy_pool.checkin(transport)146    assert dummy_pool.free == 1147    assert dummy_pool.in_use == 0148@pytest.mark.asyncio149async def test_checking_to_closed_transport(mocker, redis_pool: TransportPool):150    transport = await redis_pool.checkout()151    mocker.spy(transport, "close")152    # Close the pool153    await redis_pool.close()154    # Should work even though pool is closed155    await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})156    # Check the transport into the closed pool157    await redis_pool.checkin(transport)158    # The transport has been closed by the pool159    assert transport.close.called160@pytest.mark.asyncio161async def test_checkout_checkin_threaded(162    mocker, redis_pool: TransportPool, run_in_many_threads, get_total_redis_connections163):164    """Check in/out many connections concurrently (using threads)165    Note that this will not grow the pool. See the doc string for TransportPool166    """167    mocker.spy(redis_pool, "grow")168    mocker.spy(redis_pool, "_create_transport")169    mocker.spy(redis_pool, "_close_transport")170    async def _check_in_out():171        transport = await redis_pool.checkout()172        # Ensure we do something here in order to slow down the execution173        # time, thereby ensuring our pool starts to fill up. We also need to174        # use the connection to ensure the connection is lazy loaded175        await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})176        await asyncio.sleep(0.02)177        await redis_pool.checkin(transport)178    run_in_many_threads(_check_in_out, executions=500, max_workers=20)179    # We're running in a thread, so we never grow the pool. Instead180    # we just return one-off connections which will be closed181    # when they get checked back in182    assert not redis_pool.grow.called183    assert redis_pool._create_transport.call_count == 500184    assert redis_pool._close_transport.call_count == 500185    assert await get_total_redis_connections() == 1186@pytest.mark.asyncio187async def test_checkout_checkin_asyncio(188    mocker, redis_pool: TransportPool, get_total_redis_connections189):190    """Check in/out many connections concurrently (using asyncio tasks)191    Unlike using threads, this should grow the pool192    """193    # mocker.spy in pytest-mock runs afoul of this change in 3.8.1194    #    https://bugs.python.org/issue38857195    # We therefore use mocker.spy for python 3.7, or the new AsyncMock in 3.8196    # See: https://github.com/pytest-dev/pytest-mock/issues/178197    if sys.version_info >= (3, 8):198        from unittest.mock import AsyncMock199        redis_pool.grow = AsyncMock(wraps=redis_pool.grow)200        redis_pool._create_transport = AsyncMock(wraps=redis_pool._create_transport)201        redis_pool._close_transport = AsyncMock(wraps=redis_pool._close_transport)202    else:203        mocker.spy(redis_pool, "grow")204        mocker.spy(redis_pool, "_create_transport")205        mocker.spy(redis_pool, "_close_transport")206    async def _check_in_out():207        transport = await redis_pool.checkout()208        # Ensure we do something here in order to slow down the execution209        # time, thereby ensuring our pool starts to fill up. We also need to210        # use the connection to ensure the connection is lazy loaded211        await transport.send_event(EventMessage(api_name="api", event_name="event"), options={})212        await asyncio.sleep(0.02)213        await redis_pool.checkin(transport)214    async def _check_in_out_loop():215        for _ in range(0, 500 // 20):216            await _check_in_out()217    tasks = [asyncio.create_task(_check_in_out_loop()) for _ in range(0, 20)]218    await asyncio.wait(tasks)219    await redis_pool.close()220    assert redis_pool.grow.call_count == 20221    assert redis_pool._create_transport.call_count == 20222    assert redis_pool._close_transport.call_count == 20223    assert await get_total_redis_connections() == 1224@pytest.mark.asyncio225async def test_context(dummy_pool: TransportPool):226    async with dummy_pool as transport:227        assert isinstance(transport, DebugEventTransport)228    assert dummy_pool.free == 1229    assert dummy_pool.in_use == 0230@pytest.mark.asyncio231async def test_close(dummy_pool: TransportPool):232    await dummy_pool.grow()233    transport = dummy_pool.pool[0]234    # Fancy footwork to mock the close coroutine235    # (because it is async, not sync)236    f = Future()237    f.set_result(None)238    transport.close = MagicMock(return_value=f)239    await dummy_pool.close()240    assert transport.close.called241    assert dummy_pool.free == 0242    assert dummy_pool.in_use == 0243@pytest.mark.asyncio244async def test_proxy_attr_async(attr_test_pool: TransportPool):245    # Passes method call through to underlying transport246    result = await attr_test_pool.async_method()247    assert result248    # Transport was created, used, and is now freed up249    assert attr_test_pool.free == 1250    assert attr_test_pool.in_use == 0251@pytest.mark.asyncio252async def test_proxy_sync(attr_test_pool: TransportPool):253    # Will raise errors as pass-through is only available for async public methods254    with pytest.raises(CannotProxySynchronousMethod):255        await attr_test_pool.sync_method()256    assert attr_test_pool.free == 0257    assert attr_test_pool.in_use == 0258@pytest.mark.asyncio259async def test_proxy_async_private(attr_test_pool: TransportPool):260    # Will raise errors as pass-through is only available for async public methods261    with pytest.raises(CannotProxyPrivateMethod):262        await attr_test_pool._async_private_method()263    assert attr_test_pool.free == 0264    assert attr_test_pool.in_use == 0265@pytest.mark.asyncio266async def test_proxy_property(attr_test_pool: TransportPool):267    # Will raise errors as pass-through is only available for async public methods268    with pytest.raises(CannotProxyProperty):269        _ = attr_test_pool.class_prop270    assert attr_test_pool.free == 0271    assert attr_test_pool.in_use == 0272@pytest.mark.asyncio273async def test_attr_proxy_not_found(attr_test_pool: TransportPool):274    with pytest.raises(AttributeError):275        _ = attr_test_pool.foo276    assert attr_test_pool.free == 0277    assert attr_test_pool.in_use == 0278@pytest.mark.asyncio279async def test_close_with_transports_from_child_thread(redis_pool: TransportPool):280    """Test that the pool can be closed when it contains transports created within a child thread"""281    flag = False282    async def thread():283        nonlocal flag284        async with redis_pool as transport:285            # Do something to ensure a connection gets made286            async for _ in transport.history("foo", "bar"):287                pass288            flag = True289    fn = partial(asyncio.run, thread())290    t = Thread(target=fn)291    t.start()292    t.join()293    # Ensure we actually went and connected to redis294    assert flag295    await redis_pool.close()296    assert redis_pool.in_use == 0...pool.py
Source:pool.py  
...68            except IndexError:69                raise CannotShrinkEmptyPool(70                    "Transport pool is already empty, cannot shrink it further"71                )72            await self._close_transport(old_transport)73    async def checkout(self) -> VT:74        if self.closed:75            raise TransportPoolIsClosed("Cannot get a connection, transport pool is closed")76        if threading.current_thread() != self.home_thread:77            return await self._create_transport()78        else:79            with self.lock:80                if not self.pool:81                    await self.grow()82                transport = self.pool.pop(0)83                self.checked_out.add(transport)84            return transport85    async def checkin(self, transport: VT):86        if threading.current_thread() != self.home_thread:87            return await self._close_transport(transport)88        else:89            with self.lock:90                self.checked_out.discard(transport)91                self.pool.append(transport)92                if self.closed:93                    await self._close_all()94    @property95    def free(self) -> int:96        return len(self.pool)97    @property98    def in_use(self) -> int:99        return len(self.checked_out)100    @property101    def total(self) -> int:102        return self.free + self.in_use103    async def __aenter__(self) -> VT:104        transport = await self.checkout()105        self.context_stack.append(transport)106        return transport107    async def __aexit__(self, exc_type, exc_val, exc_tb):108        transport = self.context_stack.pop()109        await self.checkin(transport)110    async def close(self):111        with self.lock:112            self.closed = True113            await self._close_all()114    async def _close_all(self):115        with self.lock:116            while self.pool:117                await self._close_transport(self.pool.pop())118    def _instantiate_transport(self) -> VT:119        """Instantiate a transport without opening it"""120        return self.transport_class.from_config(121            config=self.config, **self.transport_config._asdict()122        )123    async def _create_transport(self) -> VT:124        """Return an opened transport"""125        new_transport = self._instantiate_transport()126        await new_transport.open()127        return new_transport128    async def _close_transport(self, transport: VT):129        """Close a specific transport"""130        await transport.close()131    def __getattr__(self, item):132        async def fn_pool_wrapper(*args, **kwargs):133            async with self as transport:134                return await getattr(transport, item)(*args, **kwargs)135        async def gen_pool_wrapper(*args, **kwargs):136            async with self as transport:137                async for value in getattr(transport, item)(*args, **kwargs):138                    yield value139        attr = getattr(self.transport_class, item, None)140        if not attr:141            raise AttributeError(142                f"Neither the transport pool {repr(self)} nor the transport with class "...binarysocket.py
Source:binarysocket.py  
...7from thrift.protocol import TBinaryProtocol8from commonutil_net_thrift.client.types import ClientSetup9from commonutil_net_thrift.connector.exceptions import ClientCheckException10_log = logging.getLogger(__name__)11def _close_transport(transport):12	try:13		transport.close()14	except Exception:15		pass16def binary_socket_connector(client_setup: ClientSetup, host: str, port: int, timeout_seconds: float = 3):17	"""18	Connector callable for making connection with Binary Protocol over Socket.19	"""20	transport = TSocket.TSocket(host, port)21	if timeout_seconds:22		transport.setTimeout(timeout_seconds * 1000.0)23	protocol = TBinaryProtocol.TBinaryProtocol(transport)24	client = client_setup.client_class(protocol)25	transport.open()  # connect26	if client_setup.check_callable:27		try:28			if not client_setup.check_callable(client):29				_log.error("failed on check connected client: host=%r, port=%r", host, port)30				raise ClientCheckException()31		except Exception as e:32			_close_transport(transport)33			_log.exception("caught exception on check connected client: host=%r, port=%r, exception=%r", host, port, e)34			raise35	if client_setup.prepare_callable:36		try:37			client_setup.prepare_callable(client)38		except Exception as e:39			_close_transport(transport)40			_log.exception("caught exception on preparing connected client for use: host=%r, port=%r, exception=%r", host, port, e)41			raise...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
