Best Python code snippet using fMBT_python
fmbtandroid.py
Source:fmbtandroid.py  
...484                raise ValueError("forcedView must be a View object or a filename")485            return self._lastView486        retryCount = 0487        while True:488            dump = self._conn.recvViewData()489            if dump != None:490                view = View(self.screenshotDir(), self.serialNumber, dump, displayToScreen)491            else:492                _adapterLog("refreshView window dump reading failed")493                view = None494                # fail quickly if there is no answer495                retryCount += self._PARSE_VIEW_RETRY_LIMIT / 2496            if dump == None or len(view.errors()) > 0:497                if view:498                    _adapterLog(formatErrors(view.errors()))499                if retryCount < self._PARSE_VIEW_RETRY_LIMIT:500                    retryCount += 1501                    time.sleep(0.2) # sleep before retry502                else:503                    raise AndroidConnectionError("Cannot read window dump")504            else:505                # successfully parsed or parsed with errors but no more retries506                self._lastView = view507                return view508    def useDisplaySize(self, (width, height) = (None, None)):509        """510        Transform coordinates of synthesized events from screenshot511        resolution to given resolution. By default events are512        synthesized directly to screenshot coordinates.513        Parameters:514          (width, height) (pair of integers, optional):515                  width and height of display in pixels. If not516                  given, values from Android system properties517                  "display.width" and "display.height" will be used.518        Returns None.519        """520        if width == None:521            width = int(self.systemProperty("display.width"))522        if height == None:523            height = int(self.systemProperty("display.height"))524        screenWidth, screenHeight = self.screenSize()525        self._conn.setScreenToDisplayCoords(526            lambda x, y: (x * width / screenWidth,527                          y * height / screenHeight))528        self._conn.setDisplayToScreenCoords(529            lambda x, y: (x * screenWidth / width,530                          y * screenHeight / height))531    def shell(self, shellCommand):532        """533        Execute shellCommand in adb shell.534        shellCommand is a string (arguments separated by whitespace).535        Returns output of "adb shell" command.536        If you wish to receive exitstatus or standard output and error537        separated from shellCommand, refer to shellSOE().538        """539        return self._conn._runAdb(["shell", shellCommand])[1]540    def shellSOE(self, shellCommand):541        """542        Execute shellCommand in adb shell.543        shellCommand is a string (arguments separated by whitespace).544        Returns tuple (exitStatus, standardOutput, standardError).545        Requires tar and uuencode to be available on the device.546        """547        return self._conn.shellSOE(shellCommand)548    def smsNumber(self, number, message):549        """550        Send message using SMS to given number.551        Parameters:552          number (string)553                  phone number to which the SMS will be sent554          message (string)555                  the message to be sent.556        Returns True on success, otherwise False.557        """558        smsCommand = ('am start -a android.intent.action.SENDTO ' +559                      '-d sms:%s --es sms_body "%s"' +560                      ' --ez exit_on_sent true')  % (number, message)561        status, out, err = self.shellSOE(smsCommand)562        if status != 0:563            _logFailedCommand("sms", smsCommand, status, out, err)564            return False565        _adapterLog("SMS command returned %s" % (out + err,))566        time.sleep(2)567        self.pressKey("KEYCODE_DPAD_RIGHT")568        time.sleep(1)569        self.pressKey("KEYCODE_ENTER")570        return True571    def supportsView(self):572        """573        Check if connected device supports reading view data.574        View data is needed by refreshView(), view(), verifyText() and575        waitText(). It is produced by Android window dump.576        Returns True if view data can be read, otherwise False.577        """578        try:579            self._conn.recvViewData()580            return True581        except AndroidConnectionError:582            return False583    def systemProperty(self, propertyName):584        """585        Returns Android Monkey Device properties, such as586        "clock.uptime", refer to Android Monkey documentation.587        """588        return self._conn.recvVariable(propertyName)589    def tapId(self, viewItemId, **tapKwArgs):590        """591        Find an item with given id from the latest view, and tap it.592        """593        assert self._lastView != None, "View required."594        items = self._lastView.findItemsById(viewItemId, count=1)595        if len(items) > 0:596            return self.tapItem(items[0], **tapKwArgs)597        else:598            _adapterLog("tapItemById(%s): no items found" % (viewItemId,))599            return False600    def tapText(self, text, partial=False, **tapKwArgs):601        """602        Find an item with given text from the latest view, and tap it.603        Parameters:604          partial (boolean, optional):605                  refer to verifyText documentation. The default is606                  False.607          tapPos (pair of floats (x, y)):608                  refer to tapItem documentation.609          long, hold, count, delayBetweenTaps (optional):610                  refer to tap documentation.611        Returns True if successful, otherwise False.612        """613        assert self._lastView != None, "View required."614        items = self._lastView.findItemsByText(text, partial=partial, count=1)615        if len(items) == 0: return False616        return self.tapItem(items[0], **tapKwArgs)617    def topApp(self):618        """619        Returns the name of the top application.620        """621        return self._conn.recvTopAppWindow()[0]622    def topWindow(self):623        """624        Returns the name of the top window.625        """626        # the top window may be None during transitions, therefore627        # retry a couple of times if necessary.628        timeout = 0.5629        pollDelay = 0.2630        start = time.time()631        tw = self._conn.recvTopAppWindow()[1]632        while tw == None and (time.time() - start < timeout):633            time.sleep(pollDelay)634            tw = self._conn.recvTopAppWindow()[1]635        return tw636    def verifyText(self, text, partial=False):637        """638        Verify that the last view has at least one item with given639        text.640        Parameters:641          text (string):642                  text to be searched for in items.643          partial (boolean, optional):644                  if True, match items if item text contains given645                  text, otherwise match only if item text is equal to646                  the given text. The default is False (exact match).647        """648        assert self._lastView != None, "View required."649        return self._lastView.findItemsByText(text, partial=partial, count=1) != []650    def view(self):651        """652        Returns the last view (the most recently refreshed view).653        """654        return self._lastView655    def waitText(self, text, partial=False, **waitKwArgs):656        """657        Wait until text appears in any view item.658        Parameters:659          text (string):660                text to be waited for.661          partial (boolean, optional):662                refer to verifyText. The default is False.663          waitTime, pollDelay (float, optional):664                refer to wait.665        Returns True if text appeared within given time limit,666        otherwise False.667        Updates the last view.668        """669        return self.wait(self.refreshView,670                         self.verifyText, (text,), {'partial': partial},671                         **waitKwArgs)672    def wake(self):673        """674        Force the device to wake up.675        """676        return self._conn.sendWake()677    def _loadDeviceAndTestINIs(self, homeDir, deviceName, iniFile):678        if deviceName != None:679            _deviceIniFilename = homeDir + os.sep + "etc" + os.sep + deviceName + ".ini"680            self.loadConfig(_deviceIniFilename, override=True, level="device")681        if iniFile:682            self.loadConfig(iniFile, override=True, level="test")683class Ini:684    """685    Container for device configuration loaded from INI files.686    INI file syntax:687    [section1]688    key1 = value1689    ; commented = out690    # commented = out691    """692    def __init__(self, iniFile=None):693        """694        Initialise the container, optionally with an initial configuration.695        Parameters:696          iniFile (file object, optional):697                  load the initial configuration from iniFile.698                  The default is None: start with empty configuration.699        """700        # _conf is a dictionary:701        # (section, key) -> value702        self._conf = {}703        if iniFile:704            self.addFile(iniFile)705    def addFile(self, iniFile, override=True):706        """707        Add values from a file to the current configuration.708        Parameters:709          iniFile (file object):710                  load values from this file object.711          override (boolean, optional):712                  If True, loaded values override existing values.713                  Otherwise, only currently undefined values are714                  loaded. The default is True.715        """716        for line in iniFile:717            line = line.strip()718            if line.startswith('[') and line.endswith(']'):719                section = line[1:-1].strip()720            elif line.startswith(";") or line.startswith("#"):721                continue722            elif '=' in line:723                key, value = line.split('=', 1)724                if override or (section, key.strip()) not in self._conf:725                    self._conf[(section, key.strip())] = value.strip()726    def sections(self):727        """728        Returns list of sections in the current configuration.729        """730        return list(set([k[0] for k in self._conf.keys()]))731    def keys(self, section):732        """733        Returns list of keys in a section in the current configuration.734        Parameters:735          section (string):736                  the name of the section.737        """738        return [k[1] for k in self._conf.keys() if k[0] == section]739    def dump(self):740        """741        Returns the current configuration as a single string in the742        INI format.743        """744        lines = []745        for section in sorted(self.sections()):746            lines.append("[%s]" % (section,))747            for key in sorted(self.keys(section)):748                lines.append("%-16s = %s" % (key, self._conf[(section, key)]))749            lines.append("")750        return "\n".join(lines)751    def set(self, section, key, value):752        """753        Set new value for a key in a section.754        Parameters:755          section, key (strings):756                  the section, the key.757          value (string):758                  the new value. If not string already, it will be759                  converted to string, and it will be loaded as a760                  string when loaded from file object.761        """762        self._conf[(section, key)] = str(value)763    def value(self, section, key, default=""):764        """765        Returns the value (string) associated with a key in a section.766        Parameters:767          section, key (strings):768                  the section and the key.769          default (string, optional):770                  the default value to be used and stored if there is771                  no value associated to the key in the section. The772                  default is the empty string.773        Reading a value of an undefined key in an undefined section774        adds the key and the section to the configuration with the775        returned (the default) value. This makes all returned values776        visible in dump().777        """778        if not (section, key) in self._conf:779            self._conf[(section, key)] = default780        return self._conf[(section, key)]781# For backward compatibility, someone might be using old _DeviceConf782_DeviceConf = Ini783class ViewItem(fmbtgti.GUIItem):784    """785    ViewItem holds the information of a single GUI element.786    """787    def __init__(self, className, code, indent, properties, parent, rawProps, dumpFilename, displayToScreen):788        self._p = properties789        self._parent = parent790        self._className = className791        self._code = code792        self._indent = indent793        self._children = []794        self._rawProps = ""795        if not "scrolling:mScrollX" in self._p:796            self._p["scrolling:mScrollX"] = 0797            self._p["scrolling:mScrollY"] = 0798        fmbtgti.GUIItem.__init__(self, className, self._calculateBbox(displayToScreen), dumpFilename)799    def addChild(self, child): self._children.append(child)800    def _calculateBbox(self, displayToScreen):801        left = int(self._p["layout:mLeft"])802        top = int(self._p["layout:mTop"])803        parent = self._parent804        while parent:805            pp = parent._p806            left += int(pp["layout:mLeft"]) - int(pp["scrolling:mScrollX"])807            top += int(pp["layout:mTop"]) - int(pp["scrolling:mScrollY"])808            parent = parent._parent809        height = int(self._p["layout:getHeight()"])810        width = int(self._p["layout:getWidth()"])811        screenLeft, screenTop = displayToScreen(left, top)812        screenRight, screenBottom = displayToScreen(left + width, top + height)813        return (screenLeft, screenTop, screenRight, screenBottom)814    def children(self):   return self._children815    def className(self):  return self._className816    def code(self):       return self._code817    def indent(self):     return self._indent818    def id(self):         return self.property("mID")819    def parent(self):     return self._parent820    def properties(self): return self._p821    def property(self, propertyName):822        return self._p.get(propertyName, None)823    def text(self):       return self.property("text:mText")824    def visible(self):825        return self._p.get("getVisibility()", "") == "VISIBLE"826    def dump(self):827        p = self._p828        return ("ViewItem(\n\tchildren = %d\n\tclassName = '%s'\n\tcode = '%s'\n\t" +829                "indent = %d\n\tproperties = {\n\t\t%s\n\t})") % (830            len(self._children), self._className, self._code, self._indent,831            '\n\t\t'.join(['"%s": %s' % (key, p[key]) for key in sorted(p.keys())]))832    def __str__(self):833        return ("ViewItem(className='%s', id=%s, bbox=%s)"  % (834                self._className, self.id(), self.bbox()))835class View(object):836    """837    View provides interface to screen dumps from Android. It parses838    the dump to a hierarchy of ViewItems. find* methods enable searching839    for ViewItems based on their properties.840    """841    def __init__(self, screenshotDir, serialNumber, dump, displayToScreen=None):842        self.screenshotDir = screenshotDir843        self.serialNumber = serialNumber844        self._viewItems = []845        self._errors = []846        self._lineRegEx = re.compile("(?P<indent>\s*)(?P<class>[\w.$]+)@(?P<id>[0-9A-Fa-f]{8} )(?P<properties>.*)")847        self._olderAndroidLineRegEx = re.compile("(?P<indent>\s*)(?P<class>[\w.$]+)@(?P<id>\w)(?P<properties>.*)")848        self._propRegEx = re.compile("(?P<prop>(?P<name>[^=]+)=(?P<len>\d+),)(?P<data>[^\s]* ?)")849        self._dump = dump850        self._rawDumpFilename = self.screenshotDir + os.sep + fmbtgti._filenameTimestamp() + "-" + self.serialNumber + ".view"851        file(self._rawDumpFilename, "w").write(self._dump)852        if displayToScreen == None:853            displayToScreen = lambda x, y: (x, y)854        try: self._parseDump(dump, self._rawDumpFilename, displayToScreen)855        except Exception, e:856            self._errors.append((-1, "", "Parser error"))857    def viewItems(self): return self._viewItems858    def errors(self): return self._errors859    def dumpRaw(self): return self._dump860    def dumpItems(self, itemList = None):861        if itemList == None: itemList = self._viewItems862        l = []863        for i in itemList:864            l.append(self._dumpItem(i))865        return '\n'.join(l)866    def dumpTree(self, rootItem = None):867        l = []868        if rootItem != None:869            l.extend(self._dumpSubTree(rootItem, 0))870        else:871            for i in self._viewItems:872                if i._indent == 0:873                    l.extend(self._dumpSubTree(i, 0))874        return '\n'.join(l)875    def _dumpSubTree(self, viewItem, indent):876        l = []877        i = viewItem878        l.append(" "*indent + self._dumpItem(viewItem))879        for i in viewItem.children():880            l.extend(self._dumpSubTree(i, indent + 4))881        return l882    def _dumpItem(self, viewItem):883        i = viewItem884        if i.text() != None: t = '"%s"' % (i.text(),)885        else: t = None886        return "id=%s cls=%s text=%s bbox=%s" % (887            i.id(), i.className(), t, i.bbox())888    def findItems(self, comparator, count=-1, searchRootItem=None, searchItems=None):889        foundItems = []890        if count == 0: return foundItems891        if searchRootItem != None:892            # find from searchRootItem and its children893            if comparator(searchRootItem):894                foundItems.append(searchRootItem)895            for c in searchRootItem.children():896                foundItems.extend(self.findItems(comparator, count=count-len(foundItems), searchRootItem=c))897        else:898            if searchItems != None:899                # find from listed items only900                searchDomain = searchItems901            else:902                # find from all items903                searchDomain = self._viewItems904            for i in searchDomain:905                if comparator(i):906                    foundItems.append(i)907                    if count > 0 and len(foundItems) >= count:908                        break909        return foundItems910    def findItemsByText(self, text, partial=False, count=-1, searchRootItem=None, searchItems=None):911        """912        Searches the GUI hiearhy for a object with a given text913        """914        if partial:915            c = lambda item: (916                item.properties().get("text:mText", "").find(text) != -1 )917        else:918            c = lambda item: (919                item.properties().get("text:mText", None) == text )920        return self.findItems(c, count=count, searchRootItem=searchRootItem, searchItems=searchItems)921    def findItemsById(self, id, count=-1, searchRootItem=None, searchItems=None):922        c = lambda item: item.properties().get("mID", "") == id923        return self.findItems(c, count=count, searchRootItem=searchRootItem, searchItems=searchItems)924    def findItemsByClass(self, className, partial=True, count=-1, searchRootItem=None, searchItems=None):925        if partial: c = lambda item: item.className().find(className) != -1926        else: c = lambda item: item.className() == className927        return self.findItems(c, count=count, searchRootItem=searchRootItem, searchItems=searchItems)928    def findItemsByIdAndClass(self, id, className, partial=True, count=-1, searchRootItem=None, searchItems=None):929        idOk = self.findItemsById(id, count=-1, searchRootItem=searchRootItem)930        return self.findItemsByClass(className, partial=partial, count=count, searchItems=idOk)931    def findItemsByRawProps(self, s, count=-1, searchRootItem=None, searchItems=None):932        c = lambda item: item._rawProps.find(s) != -1933        return self.findItems(c, count=count, searchRootItem=searchRootItem, searchItems=searchItems)934    def save(self, fileOrDirName):935        shutil.copy(self._rawDumpFilename, fileOrDirName)936    def _parseDump(self, dump, rawDumpFilename, displayToScreen):937        """938        Process the raw dump data and create a tree of ViewItems939        """940        # This code originates from tema-android-adapter-3.2,941        # AndroidAdapter/guireader.py.942        self._viewItems = []943        cellLayout = ""944        parent = None945        previousItem = None946        currentIndent = 0947        visible = True948        self.TOP_PAGED_VIEW = ""949        for lineIndex, line in enumerate(dump.splitlines()):950            if line == "DONE.":951                break952            # separate indent, class and properties for each GUI object953            # TODO: branch here according to self._androidVersion954            matcher = self._lineRegEx.match(line)955            if not matcher:956                # FIXME: this hack falls back to old format,957                # should branch according to self._androidVersion!958                matcher = self._olderAndroidLineRegEx.match(line)959                if not matcher:960                    self._errors.append((lineIndex + 1, line, "Illegal line"))961                    continue # skip this line962            className = matcher.group("class")963            # Indent specifies the hierarchy level of the object964            indent = len(matcher.group("indent"))965            # If the indent is bigger that previous, this object is a966            # child for the previous object967            if indent > currentIndent:968                parent = self._viewItems[-1]969            elif indent < currentIndent:970                for tmp in range(0, currentIndent - indent):971                    parent = parent.parent()972            currentIndent = indent973            propertiesData = matcher.group("properties")974            properties = {}975            index = 0976            x = 0977            y = 0978            # Process the properties of each GUI object979            while index < len(propertiesData):980                # Separate name and value for each property [^=]*=981                propMatch = self._propRegEx.match(propertiesData[index:-1])982                if not propMatch or len(propMatch.group("data")) < int(propMatch.group("len")):983                    if not propMatch.group("data"):984                        self._errors.append((lineIndex, propertiesData[index:-1], "Illegal property"))985                        return None986                    startFrom = index + propertiesData[index:-1].find(propMatch.group("data"))987                    currFixedData = propertiesData[startFrom:(startFrom + int(propMatch.group("len")))]988                    length = int(propMatch.group("len"))989                    # [^=]+=?, == data990                    properties[propMatch.group("name")] = currFixedData[0:length].lstrip()991                else:992                    length = int(propMatch.group("len"))993                    # [^=]+=?, == data994                    properties[propMatch.group("name")] = propMatch.group("data")[0:length].lstrip()995                index += len(propMatch.group("prop")) + length + 1996            self._viewItems.append(ViewItem(matcher.group("class"), matcher.group("id"), indent, properties, parent, matcher.group("properties"), self._rawDumpFilename, displayToScreen))997            if parent:998                parent.addChild(self._viewItems[-1])999        return self._viewItems1000    def __str__(self):1001        return 'View(items=%s, dump="%s")' % (1002            len(self._viewItems), self._rawDumpFilename)1003class _AndroidDeviceConnection:1004    """1005    Connection to the Android Device being tested.1006    """1007    _m_host = 'localhost'1008    _m_port = random.randint(20000, 29999)1009    _w_host = 'localhost'1010    _w_port = _m_port + 11011    def __init__(self, serialNumber, stopOnError=True):1012        self._serialNumber = serialNumber1013        self._stopOnError = stopOnError1014        self._shellSupportsTar = False1015        self.setScreenToDisplayCoords(lambda x, y: (x, y))1016        self.setDisplayToScreenCoords(lambda x, y: (x, y))1017        self._detectFeatures()1018        try:1019            self._resetMonkey()1020            self._resetWindow()1021        finally:1022            # Next _AndroidDeviceConnection instance will use different ports1023            self._w_port = _AndroidDeviceConnection._w_port1024            self._m_port = _AndroidDeviceConnection._m_port1025            _AndroidDeviceConnection._w_port += 1001026            _AndroidDeviceConnection._m_port += 1001027    def __del__(self):1028        try: self._monkeySocket.close()1029        except: pass1030    def target(self):1031        return self._serialNumber1032    def _cat(self, remoteFilename):1033        fd, filename = tempfile.mkstemp("fmbtandroid-cat-")1034        os.close(fd)1035        self._runAdb(["pull", remoteFilename, filename], 0)1036        contents = file(filename).read()1037        os.remove(filename)1038        return contents1039    def _runAdb(self, command, expectedExitStatus=0, timeout=None):1040        if not self._stopOnError:1041            expect = None1042        else:1043            expect = expectedExitStatus1044        if type(command) == list:1045            command = ["adb", "-s", self._serialNumber] + command1046        else:1047            command = ["adb", "-s", self._serialNumber, command]1048        return _run(command, expectedExitStatus=expect, timeout=timeout)1049    def _runSetupCmd(self, cmd, expectedExitStatus = 0):1050        _adapterLog('setting up connections: "%s"' % (cmd,))1051        try:1052            self._runAdb(cmd, expectedExitStatus)1053        except (FMBTAndroidRunError, AndroidDeviceNotFound), e:1054            _adapterLog("connection setup problem: %s" % (e,))1055            return False1056        return True1057    def _detectFeatures(self):1058        # check supported features1059        outputLines = self._runAdb(["shell", "id"])[1].splitlines()1060        if len(outputLines) == 1 and "uid=0" in outputLines[0]:1061            self._shellUid0 = True1062        else:1063            self._shellUid0 = False1064        outputLines = self._runAdb(["shell", "su", "root", "id"])[1].splitlines()1065        if len(outputLines) == 1 and "uid=0" in outputLines[0]:1066            self._shellSupportsSu = True1067        else:1068            self._shellSupportsSu = False1069        outputLines = self._runAdb(["shell", "tar"])[1].splitlines()1070        if len(outputLines) == 1 and "bin" in outputLines[0]:1071            self._shellSupportsTar = False1072        else:1073            self._shellSupportsTar = True1074    def _resetWindow(self):1075        setupCommands = [["shell", "service" , "call", "window", "1", "i32", "4939"],1076                         ["forward", "tcp:"+str(self._w_port), "tcp:4939"]]1077        for c in setupCommands:1078            self._runSetupCmd(c)1079    def _resetMonkey(self, timeout=3, pollDelay=.25):1080        tryKillingMonkeyOnFailure = 11081        failureCountSinceKill = 01082        endTime = time.time() + timeout1083        if self._shellUid0:1084            monkeyLaunch = ["monkey"]1085        elif self._shellSupportsSu:1086            monkeyLaunch = ["su", "root", "monkey"]1087        else:1088            monkeyLaunch = ["monkey"]1089        while time.time() < endTime:1090            if not self._runSetupCmd(["shell"] + monkeyLaunch + ["--port", "1080"], None):1091                time.sleep(pollDelay)1092                failureCountSinceKill += 11093                continue1094            time.sleep(pollDelay)1095            if not self._runSetupCmd(["forward", "tcp:"+str(self._m_port), "tcp:1080"]):1096                time.sleep(pollDelay)1097                failureCountSinceKill += 11098                continue1099            try:1100                self._monkeySocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)1101                self._monkeySocket.connect((self._m_host, self._m_port))1102                self._monkeySocket.setblocking(0)1103                self._monkeySocket.settimeout(1.0)1104                self._platformVersion = self._monkeyCommand("getvar build.version.release", retry=0)[1]1105                if len(self._platformVersion) > 0:1106                    self._monkeySocket.settimeout(5.0)1107                    return True1108            except Exception, e:1109                failureCountSinceKill += 11110            time.sleep(pollDelay)1111            if failureCountSinceKill > 2 and tryKillingMonkeyOnFailure > 0:1112                if self._shellSupportsSu:1113                    self._runSetupCmd(["shell", "su", "root", "pkill", "monkey"])1114                else:1115                    self._runSetupCmd(["shell", "pkill", "monkey"])1116                tryKillingMonkeyOnFailure -= 11117                failureCountSinceKill = 01118                time.sleep(pollDelay)1119        if self._stopOnError:1120            msg = 'Android monkey error: cannot connect to "adb shell monkey --port 1080" to device %s' % (self._serialNumber)1121            _adapterLog(msg)1122            raise AndroidConnectionError(msg)1123        else:1124            return False1125    def _monkeyCommand(self, command, retry=3):1126        try:1127            self._monkeySocket.sendall(command + "\n")1128            data = self._monkeySocket.recv(4096).strip()1129            if len(data) == 0 and retry > 0:1130                return self._monkeyCommand(command, retry-1)1131            if data == "OK":1132                return True, None1133            elif data.startswith("OK:"):1134                return True, data.split("OK:")[1]1135            _adapterLog("monkeyCommand failing... command: '%s' response: '%s'" % (command, data))1136            return False, None1137        except socket.error:1138            try: self._monkeySocket.close()1139            except: pass1140            if retry > 0:1141                self._resetMonkey()1142                return self._monkeyCommand(command, retry=retry-1)1143            else:1144                raise AndroidConnectionError('Android monkey socket connection lost while sending command "%s"' % (command,))1145    def reboot(self, reconnect, firstBootAfterFlashing, timeout):1146        if firstBootAfterFlashing:1147            self._runAdb("root")1148            time.sleep(2)1149            self._runAdb(["shell", "rm", "/data/data/com.android.launcher/shared_prefs/com.android.launcher2.prefs.xml"])1150        self._runAdb("reboot")1151        _adapterLog("rebooting " + self._serialNumber)1152        if reconnect:1153            time.sleep(2)1154            endTime = time.time() + timeout1155            status, _, _ = self._runAdb("wait-for-device", expectedExitStatus=None, timeout=timeout)1156            if status != 0:1157                raise AndroidDeviceNotFound('"timeout %s adb wait-for-device" status %s' % (timeout, status))1158            self._detectFeatures()1159            while time.time() < endTime:1160                try:1161                    if self._resetMonkey(timeout=1, pollDelay=1):1162                        break1163                except AndroidConnectionError:1164                    pass1165                time.sleep(1)1166            else:1167                msg = "reboot: reconnecting to " + self._serialNumber + " failed"1168                _adapterLog(msg)1169                raise AndroidConnectionError(msg)1170            self._resetWindow()1171        return True1172    def recvVariable(self, variableName):1173        ok, value = self._monkeyCommand("getvar " + variableName)1174        if ok: return value1175        else:1176            # LOG: getvar variableName failed1177            return None1178    def recvScreenSize(self):1179        try:1180            height = int(self.recvVariable("display.height"))1181            width = int(self.recvVariable("display.width"))1182        except TypeError:1183            return None, None1184        return width, height1185    def recvTopAppWindow(self):1186        _, output, _ = self._runAdb(["shell", "dumpsys", "window"], 0)1187        if self._platformVersion >= "4.2":1188            s = re.findall("mCurrentFocus=Window\{(#?[0-9A-Fa-f]{8})( [^ ]*)? (?P<winName>[^}]*)\}", output)1189        else:1190            s = re.findall("mCurrentFocus=Window\{(#?[0-9A-Fa-f]{8}) (?P<winName>[^ ]*) [^ ]*\}", output)1191        if s and len(s[-1][-1].strip()) > 1:1192            topWindowName = s[-1][-1]1193            if len(s) > 0:1194                _adapterLog('recvTopAppWindow warning: several mCurrentFocus windows: "%s"'1195                            % ('", "'.join([w[-1] for w in s]),))1196        else: topWindowName = None1197        s = re.findall("mFocusedApp=AppWindowToken.*ActivityRecord\{#?[0-9A-Fa-f]{8}( [^ ]*)? (?P<appName>[^}]*)\}", output)1198        if s and len(s[0][-1].strip()) > 1:1199            topAppName = s[0][-1].strip()1200        else:1201            topAppName = None1202        return topAppName, topWindowName1203    def sendTap(self, xCoord, yCoord):1204        xCoord, yCoord = self._screenToDisplay(xCoord, yCoord)1205        return self._monkeyCommand("tap " + str(xCoord) + " " + str(yCoord))[0]1206    def sendKeyUp(self, key):1207        return self._monkeyCommand("key up " + key)[0]1208    def sendKeyDown(self, key):1209        return self._monkeyCommand("key down " + key)[0]1210    def sendTouchUp(self, xCoord, yCoord):1211        xCoord, yCoord = self._screenToDisplay(xCoord, yCoord)1212        return self._monkeyCommand("touch up " + str(xCoord) + " " + str(yCoord))[0]1213    def sendTouchDown(self, xCoord, yCoord):1214        xCoord, yCoord = self._screenToDisplay(xCoord, yCoord)1215        return self._monkeyCommand("touch down " + str(xCoord) + " " + str(yCoord))[0]1216    def sendTouchMove(self, xCoord, yCoord):1217        xCoord, yCoord = self._screenToDisplay(xCoord, yCoord)1218        return self._monkeyCommand("touch move " + str(xCoord) + " " + str(yCoord))[0]1219    def sendTrackBallMove(self, dx, dy):1220        dx, dy = self._screenToDisplay(dx, dy)1221        return self._monkeyCommand("trackball " + str(dx) + " " + str(dy))[0]1222    def sendPress(self, key):1223        return self._monkeyCommand("press " + key)[0]1224    def sendType(self, text):1225        for lineIndex, line in enumerate(text.split('\n')):1226            if lineIndex > 0: self.sendPress("KEYCODE_ENTER")1227            for wordIndex, word in enumerate(line.split(' ')):1228                if wordIndex > 0: self.sendPress("KEYCODE_SPACE")1229                if len(word) > 0 and not self._monkeyCommand("type " + word)[0]:1230                    _adapterLog('sendType("%s") failed when sending word "%s"' %1231                                (text, word))1232                    return False1233        return True1234    def sendWake(self):1235        return self._monkeyCommand("wake")[0]1236    def recvScreenshot(self, filename, retry=2, retryDelay=1.0):1237        """1238        Capture a screenshot and copy the image file to given path or1239        system temp folder.1240        Returns True on success, otherwise False.1241        """1242        remotefile = '/sdcard/' + os.path.basename(filename)1243        self._runAdb(['shell', 'screencap', '-p', remotefile], 0)1244        status, out, err = self._runAdb(['pull', remotefile, filename], [0, 1])1245        if status != 0:1246            raise FMBTAndroidError("Failed to fetch screenshot from the device: %s. SD card required." % ((out + err).strip(),))1247        status, _, _ = self._runAdb(['shell', 'rm', remotefile], 0)1248        if os.path.getsize(filename) == 0:1249            _adapterLog("received screenshot of size 0")1250            if retry > 0:1251                time.sleep(retryDelay)1252                return self.recvScreenshot(filename, retry-1, retryDelay)1253            else:1254                raise FMBTAndroidError("Screenshot file size 0")1255        return True1256    def setScreenToDisplayCoords(self, screenToDisplayFunction):1257        self._screenToDisplay = screenToDisplayFunction1258    def setDisplayToScreenCoords(self, displayToScreenFunction):1259        self._displayToScreen = displayToScreenFunction1260    def shellSOE(self, shellCommand):1261        fd, filename = tempfile.mkstemp(prefix="fmbtandroid-shellcmd-")1262        remotename = '/sdcard/' + os.path.basename(filename)1263        os.write(fd, shellCommand + "\n")1264        os.close(fd)1265        self._runAdb(["push", filename, remotename], 0)1266        cmd = "source %s >%s.out 2>%s.err; echo $? > %s.status" % ((remotename,)*4)1267        if self._shellSupportsTar:1268            # do everything we can in one command to minimise adb1269            # commands: execute command, record results, package,1270            # print uuencoded package and remove remote temp files1271            cmd += "; cd %s; tar czf - %s.out %s.err %s.status | uuencode %s.tar.gz; rm -f %s*" % (1272                (os.path.dirname(remotename),) + ((os.path.basename(remotename),) * 5))1273            status, output, error = self._runAdb(["shell", cmd], 0)1274            file(filename, "w").write(output)1275            uu.decode(filename, out_file=filename + ".tar.gz")1276            import tarfile1277            tar = tarfile.open(filename + ".tar.gz")1278            basename = os.path.basename(filename)1279            stdout = tar.extractfile(basename + ".out").read()1280            stderr = tar.extractfile(basename + ".err").read()1281            try: exitstatus = int(tar.extractfile(basename + ".status").read())1282            except: exitstatus = None1283            os.remove(filename)1284            os.remove(filename + ".tar.gz")1285        else:1286            # need to pull files one by one, slow.1287            self._runAdb(["shell", cmd], 0)1288            stdout = self._cat(remotename + ".out")1289            stderr = self._cat(remotename + ".err")1290            try: exitstatus = int(self._cat(remotename + ".status"))1291            except: exitstatus = None1292            self._runAdb(["shell", "rm -f "+remotename+"*"])1293        return exitstatus, stdout, stderr1294    def recvViewData(self, retry=3):1295        _dataBufferLen = 4096 * 161296        try:1297            self._windowSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)1298            self._windowSocket.connect( (self._w_host, self._w_port) )1299            self._windowSocket.settimeout(60)1300            # DUMP -1: get foreground window info1301            if self._windowSocket.sendall("DUMP -1\n") == 0:1302                # LOG: readGUI cannot write to window socket1303                raise AndroidConnectionError("writing socket failed")1304            # Read until a "DONE" line or timeout1305            data = ""1306            while True:1307                newData = ''1308                try:1309                    newData = self._windowSocket.recv(_dataBufferLen)1310                except socket.timeout:1311                    data = None1312                    break1313                data += newData1314                if data.splitlines()[-1] == "DONE" or newData == '':1315                    break1316            return data1317        except Exception, msg:1318            _adapterLog("recvViewData: window socket error: %s" % (msg,))1319            if retry > 0:1320                try: self._windowSocket.close()1321                except: pass1322                self._resetWindow()1323                time.sleep(0.5)1324                return self.recvViewData(retry=retry-1)1325            else:1326                msg = "recvViewData: cannot read window socket"1327                _adapterLog(msg)1328                raise AndroidConnectionError(msg)1329        finally:1330            try: self._windowSocket.close()1331            except: pass1332class FMBTAndroidError(Exception): pass1333class FMBTAndroidRunError(FMBTAndroidError): pass1334class AndroidConnectionError(FMBTAndroidError): pass1335class AndroidConnectionLost(AndroidConnectionError): pass...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
