Best Python code snippet using uiautomator
client.py
Source:client.py  
1# Copyright (c) 2016, Louis Opter <louis@opter.org>2# All rights reserved.3#4# Redistribution and use in source and binary forms, with or without5# modification, are permitted provided that the following conditions are met:6#7# 1. Redistributions of source code must retain the above copyright notice,8#    this list of conditions and the following disclaimer.9#10# 2. Redistributions in binary form must reproduce the above copyright notice,11#    this list of conditions and the following disclaimer in the documentation12#    and/or other materials provided with the distribution.13#14# 3. Neither the name of the copyright holder nor the names of its contributors15#    may be used to endorse or promote products derived from this software16#    without specific prior written permission.17#18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE28# POSSIBILITY OF SUCH DAMAGE.29import asyncio30import functools31import json32import locale33import logging34import os35import urllib.parse36import uuid37from typing import (38    Any,39    Callable,40    Dict,41    List,42    NamedTuple,43    Sequence,44    Tuple,45)46from typing import Type  # noqa47from . import (48    exceptions,49    requests,50    responses,51    structs,52)53logger = logging.getLogger("lightsc.client")54_JSONRPCMethod = NamedTuple("_JSONRPCMethod", [55    ("name", str),56    ("map_result", Callable[[Any], responses.Response]),57])58_JSONRPC_API = {59    requests.GetLightState: _JSONRPCMethod(60        name="get_light_state",61        map_result=lambda result: responses.LightsState([62            structs.LightBulb(63                b["label"], b["power"], *b["hsbk"], tags=b["tags"]64            ) for b in result65        ])66    ),67    requests.SetLightFromHSBK: _JSONRPCMethod(68        name="set_light_from_hsbk",69        map_result=lambda result: responses.Bool(result)70    ),71    requests.PowerOn: _JSONRPCMethod(72        name="power_on",73        map_result=lambda result: responses.Bool(result)74    ),75    requests.PowerOff: _JSONRPCMethod(76        name="power_off",77        map_result=lambda result: responses.Bool(result)78    ),79    requests.PowerToggle: _JSONRPCMethod(80        name="power_toggle",81        map_result=lambda result: responses.Bool(result)82    ),83    requests.SetWaveform: _JSONRPCMethod(84        name="set_waveform",85        map_result=lambda result: responses.Bool(result)86    ),87}  # type: Dict[Type[requests.RequestClass], _JSONRPCMethod]88class _JSONRPCCall:89    def __init__(90        self, method: str, params: Sequence[Any], timeout: int = None91    ) -> None:92        self.id = str(uuid.uuid4())93        self.method = method94        self.params = params95        self.timeout = timeout96        self.timeout_handle = None  # type: asyncio.Handle97        self.request = {98            "id": self.id,99            "jsonrpc": "2.0",100            "method": method,101            "params": params,102        }103        self.response = asyncio.Future()  # type: asyncio.futures.Future104    @property105    def response_or_exception(self) -> Any:106        ex = self.response.exception()107        return ex if ex is not None else self.response.result()108class AsyncJSONRPCLightsClient:109    READ_SIZE = 8192110    TIMEOUT = 2  # seconds111    ENCODING = "utf-8"112    def __init__(113        self,114        url: str,115        encoding: str = ENCODING,116        timeout: int = TIMEOUT,117        read_size: int = READ_SIZE,118        loop: asyncio.AbstractEventLoop = None119    ) -> None:120        self.url = url121        self.encoding = encoding122        self.timeout = timeout123        self.read_size = read_size124        self._listen_task = None  # type: asyncio.Task125        self._pending_calls = {}  # type: Dict[str, _JSONRPCCall]126        self._reader = None  # type: asyncio.StreamReader127        self._writer = None  # type: asyncio.StreamWriter128        self._loop = loop or asyncio.get_event_loop()129    def _handle_response(130        self, id: str, response: Any, timeout: bool = False131    ) -> None:132        call = self._pending_calls.pop(id)133        if timeout is True:134            call.response.set_exception(exceptions.LightsClientTimeoutError())135            return136        call.timeout_handle.cancel()137        call.response.set_result(response)138    async def _jsonrpc_execute(139        self, pipeline: List[_JSONRPCCall]140    ) -> Dict[str, Any]:141        if not pipeline:142            return {}143        requests = [call.request for call in pipeline]144        for req in requests:145            logger.info("Request {id}: {method}({params})".format(**req))146        payload = json.dumps(requests[0] if len(requests) == 1 else requests)147        self._writer.write(payload.encode(self.encoding, "surrogateescape"))148        await self._writer.drain()149        for call in pipeline:150            call.timeout_handle = self._loop.call_later(151                call.timeout,152                functools.partial(153                    self._handle_response, call.id, response=None, timeout=True154                )155            )156            self._pending_calls[call.id] = call157        futures = [call.response for call in pipeline]158        await asyncio.wait(futures, loop=self._loop)159        return {call.id: call.response_or_exception for call in pipeline}160    async def close(self) -> None:161        if self._listen_task is not None:162            self._listen_task.cancel()163            await asyncio.wait([self._listen_task], loop=self._loop)164            self._listen_task = None165        if self._writer is not None:166            if self._writer.can_write_eof():167                self._writer.write_eof()168            self._writer.close()169        if self._reader is not None:170            self._reader.feed_eof()171            if not self._reader.at_eof():172                await self._reader.read()173        self._reader = self._writer = None174        self._pending_calls = {}175    async def _reconnect(self) -> None:176        await self.close()177        await self.connect()178    async def apply(self, req: requests.Request, timeout: int = TIMEOUT):179        method = _JSONRPC_API[req.__class__]180        call = _JSONRPCCall(method.name, req.params, timeout=timeout)181        result = (await self._jsonrpc_execute([call]))[call.id]182        if isinstance(result, Exception):183            raise result184        return method.map_result(result)185    async def connect(self) -> None:186        parts = urllib.parse.urlparse(self.url)187        if parts.scheme == "unix+jsonrpc":188            path = os.path.join(parts.netloc, parts.path).rstrip(os.path.sep)189            open_connection = functools.partial(190                asyncio.open_unix_connection, path191            )192        elif parts.scheme == "tcp+jsonrpc":193            open_connection = functools.partial(194                asyncio.open_connection, parts.hostname, parts.port195            )196        else:197            raise ValueError("Unsupported url {}".format(self.url))198        try:199            self._reader, self._writer = await asyncio.wait_for(200                open_connection(limit=self.read_size, loop=self._loop),201                self.timeout,202                loop=self._loop,203            )204            self._listen_task = self._loop.create_task(self._listen())205        except Exception:206            logger.error("Couldn't open {}".format(self.url))207            raise208    async def _listen(self) -> None:209        # FIXME:210        #211        # This method is fucked, we need to add a real streaming mode on212        # lightsd's side and then an async version of ijson:213        buf = bytearray()  # those bufs need to be bound to some max size214        sbuf = str()215        while True:216            chunk = await self._reader.read(self.READ_SIZE)217            if not len(chunk):  # EOF, reconnect218                logger.info("EOF, reconnecting...")219                # XXX: deadlock within the close call in _reconnect? (and if220                # that's the case maybe you can use an event or something).221                await self._reconnect()222                return223            buf += chunk224            try:225                sbuf += buf.decode(self.encoding, "strict")  # strict is fucked226            except UnicodeError:227                continue228            buf = bytearray()229            while sbuf:230                # and this is completely fucked:231                try:232                    response = json.loads(sbuf)233                    sbuf = str()234                except Exception:235                    def find_response(delim: str) -> Tuple[Dict[str, Any], str]:236                        offset = sbuf.find(delim)237                        while offset != -1:238                            try:239                                response = json.loads(sbuf[:offset + 1])240                                return response, sbuf[offset + 1:]241                            except Exception:242                                offset = sbuf.find(delim, offset + 2)243                        return None, sbuf244                    for delim in {"}{", "}[", "]{", "]["}:245                        response, sbuf = find_response(delim)246                        if response is not None:247                            break  # yay!248                    else:249                        break  # need more data250                batch = response if isinstance(response, list) else [response]251                for response in batch:252                    id = response["id"]253                    error = response.get("error")254                    if error is not None:255                        code = error.get("code")256                        msg = error.get("message")257                        logger.warning("Error on request {}: {} - {}".format(258                            id, code, msg259                        ))260                        call = self._pending_calls.pop(id)261                        ex = exceptions.LightsClientError(msg)262                        call.response.set_exception(ex)263                        call.timeout_handle.cancel()264                        continue265                    logger.info("Response {}: {}".format(266                        id, response["result"]267                    ))268                    self._handle_response(id, response["result"])269    def batch(self) -> "_AsyncJSONRPCBatch":270        return _AsyncJSONRPCBatch(self)271# LightsClient could eventually point to a different but api-compatible class272# someday:273LightsClient = AsyncJSONRPCLightsClient274class _AsyncJSONRPCBatch:275    def __init__(self, client: AsyncJSONRPCLightsClient) -> None:276        self.responses = None  # type: List[responses.Response]277        self.exceptions = None  # type: List[Exception]278        self._client = client279        self._batch = []  # type: List[Tuple[_JSONRPCMethod, _JSONRPCCall]]280    async def __aenter__(self) -> "_AsyncJSONRPCBatch":281        return self282    async def __aexit__(self, exc_type, exc_val, exc_tb):283        if exc_type is None:284            await self.execute()285    def append(286        self,287        req: requests.Request,288        timeout: int = AsyncJSONRPCLightsClient.TIMEOUT289    ) -> None:290        method = _JSONRPC_API[req.__class__]291        call = _JSONRPCCall(method.name, req.params, timeout=timeout)292        self._batch.append((method, call))293    async def execute(self) -> None:294        resp_by_id = await self._client._jsonrpc_execute([295            call for method, call in self._batch296        ])297        self.responses = []298        self.exceptions = []299        for method, call in self._batch:300            raw_resp = resp_by_id[call.id]301            if isinstance(raw_resp, Exception):302                self.exceptions.append(raw_resp)303            else:304                self.responses.append(method.map_result(raw_resp))305LightsCommandBatch = _AsyncJSONRPCBatch306async def get_lightsd_unix_socket_async(307    loop: asyncio.AbstractEventLoop = None,308) -> str:309    lightsdrundir = None310    try:311        process = await asyncio.create_subprocess_exec(312            "lightsd", "--rundir",313            stdout=asyncio.subprocess.PIPE,314            stderr=asyncio.subprocess.DEVNULL,315            loop=loop,316        )317    except FileNotFoundError:  # couldn't find the lightsd bin318        pass319    else:320        stdout, stderr = await process.communicate()321        stdout = stdout.decode(locale.getpreferredencoding()).strip()322        if process.returncode == 0 and stdout:323            lightsdrundir = stdout324    if lightsdrundir is None:325        lightsdrundir = "build"326        logger.warning(327            "Couldn't infer lightsd's runtime directory, is "328            "lightsd installed? Trying {}â¦".format(lightsdrundir)329        )330    return "unix+jsonrpc://" + os.path.join(lightsdrundir, "socket")331async def create_async_lightsd_connection(332    url: str = None,333    loop: asyncio.AbstractEventLoop = None334) -> AsyncJSONRPCLightsClient:335    if loop is None:336        loop = asyncio.get_event_loop()337    if url is None:338        url = await get_lightsd_unix_socket_async(loop)339    c = AsyncJSONRPCLightsClient(url, loop=loop)340    await c.connect()341    return c342def create_lightsd_connection(url: str = None) -> None:...automator_client.py
Source:automator_client.py  
...68            timeout=int(os.environ.get("jsonrpc_timeout", 90)))69    def jsonrpc_wrap(self, timeout):70        server = self71        ERROR_CODE_BASE = -3200072        def _JsonRPCMethod(url, method, timeout, restart=True):73            _method_obj = JsonRPCMethod(url, method, timeout)74            def wrapper(*args, **kwargs):75                URLError = urllib3.exceptions.HTTPError if os.name == "nt" \76                    else urllib2.URLError77                try:78                    return _method_obj(*args, **kwargs)79                except (URLError, socket.error, HTTPException) as e:80                    if restart:81                        server.stop()82                        server.start(timeout=30)83                        return _JsonRPCMethod(84                            url, method, timeout, False)(*args, **kwargs)85                    else:86                        raise87                except JsonRPCError as e:88                    if e.code >= ERROR_CODE_BASE - 1:89                        server.stop()90                        server.start()91                        return _method_obj(*args, **kwargs)92                    elif e.code == ERROR_CODE_BASE - 2 and self.handlers['on']:93                        # Not Found94                        try:95                            self.handlers['on'] = False96                            # any handler returns True97                            # will break the left handlers...autostub.py
Source:autostub.py  
...139            timeout=int(os.environ.get("jsonrpc_timeout", 90)))140    def jsonrpc_wrap(self, timeout):141        server = self142        ERROR_CODE_BASE = -32000143        def _JsonRPCMethod(url, method, timeout, restart=True):144            _method_obj = JsonRPCMethod(url, method, timeout)145            def wrapper(*args, **kwargs):146                URLError = urllib3.exceptions.HTTPError if os.name == "nt" \147                    else urllib2.URLError148                try:149                    return _method_obj(*args, **kwargs)150                except (URLError, socket.error, HTTPException) as e:151                    if restart:152                        server.stop()153                        server.start(timeout=30)154                        return _JsonRPCMethod(155                            url, method, timeout, False)(*args, **kwargs)156                    else:157                        raise158                except JsonRPCError as e:159                    if e.code >= ERROR_CODE_BASE - 1:160                        server.stop()161                        server.start()162                        return _method_obj(*args, **kwargs)163                    elif e.code == ERROR_CODE_BASE - 2 and self.handlers['on']:164                        # Not Found165                        try:166                            self.handlers['on'] = False167                            # any handler returns True168                            # will break the left handlers...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
