Best Python code snippet using lisa_python
MarketDataService.py
Source:MarketDataService.py  
...68            69            Returns:70                Paged result of CurveRange entity.71        """72        return _get_event_loop().run_until_complete(self.readCurveRangeAsync(id, page, pageSize, product, versionFrom, versionTo))73    async def readMarketDataRegistryByIdAsync(self, id: int) -> MarketDataEntityOutput:74        """75            Reads MarketData by id with MarketDataID.76            Args:77                id: ID of the marketdata to be retrieved.78            79            Returns:80                MarketData Entity Output (Async).81        """82        url = "/marketdata/entity/" + str(id) 83        with self.__client as c:84            res = await asyncio.gather(*[self.__executor.exec(c.exec, 'GET', url, None, retcls=MarketDataEntityOutput)])85            return cast(MarketDataEntityOutput,res[0])86    def readMarketDataRegistryById(self, id: int) -> MarketDataEntityOutput :  87        """88            Reads MarketData by curve name with MarketDataID.89            Args:90                id: ID of the marketdata to be retrieved.91            Returns:92                MarketData Entity Output.93        """ 94        return _get_event_loop().run_until_complete(self.readMarketDataRegistryByIdAsync(id))95    async def updateMarketDataAsync(self, id: int, entity: MarketDataEntityInput) -> MarketDataEntityOutput :96        """ 97            Saves the given MarketData Entity98            Args:99                id: int of the marketdata to be updated100                101            Returns:102                MarketData Entity Output (Async).103        """104        url = "/marketdata/entity/" + str(id) 105        with self.__client as c:106            res = await asyncio.gather(*[self.__executor.exec(c.exec, 'PUT', url, entity, MarketDataEntityOutput)])107            return cast(MarketDataEntityOutput,res[0])108    def updateMarketData(self, id: int, entity: MarketDataEntityInput):109        """ 110            Saves the given MarketData Entity111            Args:112                id: int of the marketdata to be updated113            Returns:114                MarketData Entity Output.115        """116        return _get_event_loop().run_until_complete(self.updateMarketDataAsync(id, entity))117    async def deleteMarketDataAsync(self, id: int) -> None :118        """ 119            Delete the specific MarketData entity by id120            Args:121                id: int of the marketdata to be deleted122            Returns:123                MarketData Entity Output (Async).124        """125        url = "/marketdata/entity/" + str(id) 126        with self.__client as c:127            await asyncio.gather(*[self.__executor.exec(c.exec, 'DELETE', url, None)])128            return None129    def deleteMarketData(self, id: int) -> None :130        """ 131            Delete the specific MarketData entity by id132            Args: 133                id: int of the marketdata to be deleted134            Returns:135                MarketData Entity Output.136        """137        return _get_event_loop().run_until_complete(self.deleteMarketDataAsync(id))138    async def readMarketDataRegistryByNameAsync(self, provider: str, curveName: str) -> MarketDataEntityOutput:139        """140            Reads MarketData by provider and curve name.141            Args:142                provider: string of the provider to be retrieved.143                curveName: string of the curve name to be retrieved.144            Returns:145                MarketData Entity Output (Async).146        """147        url = "/marketdata/entity" 148        params = {"provider": provider , "curveName":curveName}149        with self.__client as c:150            res = await asyncio.gather(*[self.__executor.exec(c.exec, 'GET', url, None, retcls=MarketDataEntityOutput, params = params)])151            return cast(MarketDataEntityOutput,res[0])152    def readMarketDataRegistryByName(self, provider: str, curveName: str) -> MarketDataEntityOutput:153        """154            Reads MarketData by provider and curve name.155            Args:156                provider: string of the provider to be retrieved.157                curveName: string of the curve name to be retrieved.158                159            Returns:160                MarketData Entity Output.161        """162        return _get_event_loop().run_until_complete(self.readMarketDataRegistryByNameAsync(provider, curveName))163    async def registerMarketDataAsync(self, entity: MarketDataEntityInput) -> MarketDataEntityOutput:164        """165            Register a new MarketData entity.166            Args:167                entity: The Market Data Entity Input168                169            Returns:170                MarketData Entity Output (Async).171        """172        url = "/marketdata/entity"173        with self.__client as c:174            res = await asyncio.gather(*[self.__executor.exec(c.exec, 'POST', url, entity, MarketDataEntityOutput)])175            return cast(MarketDataEntityOutput,res[0])176    def registerMarketData(self, entity: MarketDataEntityInput) -> MarketDataEntityOutput:177        """178            Register a new MarketData entity.179            Args:180                entity: The Market Data Entity Input181                182            Returns:183                MarketData Entity Output.184        """185        return _get_event_loop().run_until_complete(self.registerMarketDataAsync(entity))186    async def upsertDataAsync(self, data:UpsertData) -> None:187        url = "/marketdata/upsertdata"188        with self.__client as c:189            await asyncio.gather(*[self.__executor.exec(c.exec, 'POST', url, data)])190            return None191    192    def upsertData(self, data:UpsertData) -> None:193        return _get_event_loop().run_until_complete(self.upsertDataAsync(data))194def _get_event_loop():195    """196    Wrapper around asyncio get_event_loop.197    Ensures that there is an event loop available.198    An event loop may not be available if the sdk is not run in the main event loop199    """200    try:201        asyncio.get_event_loop()202    except RuntimeError as ex:203        if "There is no current event loop in thread" in str(ex):204            loop = asyncio.new_event_loop()205            asyncio.set_event_loop(loop)206    ...test_event_loop.py
Source:test_event_loop.py  
...3import pytest4from samsung_multiroom.api import ApiResponse5from samsung_multiroom.event import EventLoop6from samsung_multiroom.event.event import Event7def _get_event_loop():8    api_stream = MagicMock()9    event_loop = EventLoop(api_stream)10    return (event_loop, api_stream)11def _fake_event_factory(response):12    event = MagicMock(spec=Event)13    event.name = 'fake.event'14    return event15def _get_api_response(name):16    event = MagicMock(spec=ApiResponse)17    event.name = name18    return event19class TestEventLoop():  # pytest-asyncio doesn't play well with unittest.TestCase20    @pytest.mark.asyncio21    async def test_loop(self):22        listener = MagicMock()23        event_loop, api_stream = _get_event_loop()24        api_stream.open.return_value = iter([25            _get_api_response('FakeEvent'),26        ])27        event_loop.register_factory(_fake_event_factory)28        event_loop.add_listener('fake.event', listener)29        await event_loop.loop()30        listener.assert_called_once()31    @pytest.mark.asyncio32    async def test_loop_no_events(self):33        listener = MagicMock()34        event_loop, api_stream = _get_event_loop()35        api_stream.open.return_value = iter([36            _get_api_response('FakeEvent'),37        ])38        event_loop.add_listener('*', listener)39        await event_loop.loop()...async_utils.py
Source:async_utils.py  
...4# LICENSE file in the root directory of this source tree.5import asyncio6from contextlib import contextmanager7@contextmanager8def _get_event_loop():9    loop = asyncio.get_event_loop()10    if not loop.is_running():11        yield loop12    else:13        yield asyncio.new_event_loop()14def wait_for(coro):15    with _get_event_loop() as loop:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
