Best Python code snippet using fMBT_python
fmbttizen.py
Source:fmbttizen.py  
...405    """406    def __init__(self, serialNumber=None, loginCommand=None, debugAgentFile=None):407        if loginCommand == None:408            if serialNumber == None:409                self._serialNumber = self.recvSerialNumber()410            else:411                self._serialNumber = serialNumber412            self._loginCommand = None413        else:414            if serialNumber == None:415                self._serialNumber = "unknown"416            else:417                self._serialNumber = serialNumber418            if isinstance(loginCommand, str):419                self._loginCommand = shlex.split(loginCommand)420            else:421                self._loginCommand = loginCommand422        self._useSdb = (self._loginCommand == None)423        self._useSsh = not self._useSdb424        self._sdbShell = None425        self._debugAgentFile = debugAgentFile426        self._agentNeedsResolution = True427        self.open()428    def __del__(self):429        self.close()430    def open(self):431        if self._serialNumber == "unknown" and self._useSdb:432            raise TizenDeviceNotFoundError("Tizen device not found.")433        self.close()434        agentFilename = os.path.join(435            os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))),436            "fmbttizen-agent.py")437        remoteUploadPath = "/tmp"438        agentRemoteFilename = remoteUploadPath + "/fmbttizen-agent.py"439        uploadFiles = [(agentFilename,440                        agentRemoteFilename),441                       (os.path.join(os.path.dirname(agentFilename),442                                     "fmbtuinput.py"),443                        remoteUploadPath + "/fmbtuinput.py")]444        # Upload fmbttizen-agent to the device445        if self._useSdb:446            for src, dst in uploadFiles:447                uploadCmd = ["sdb", "-s", self._serialNumber, "push", src, dst]448                status, out, err = _run(uploadCmd, range(256))449                if status == 127:450                    raise TizenConnectionError('Executing "sdb -s %s push" failed. Check your Tizen SDK installation.' % (self._serialNumber,))451                elif status != 0:452                    if "device not found" in err:453                        raise TizenDeviceNotFoundError('Tizen device "%s" not found.' % (self._serialNumber,))454                    else:455                        raise TizenConnectionError('Executing "%s" failed: %s' % (' '.join(uploadCmd), err + " " + out))456        else: # using SSH457            if self._loginCommand:458                for src, dst in uploadFiles:459                    uploadCmd = self._loginCommand + ["cat", ">", dst]460                    p = subprocess.Popen(uploadCmd, shell=False, stdin=subprocess.PIPE)461                    p.stdin.write(file(src).read())462                    p.stdin.close()463                    p.wait()464            else: # run locally without remote login, no upload465                agentRemoteFilename = agentFilename466        # Launch agent, create persistent connection to it467        if self._useSdb:468            remoteShellCmd = ["sdb", "-s", self._serialNumber, "shell"]469        else: # using SSH470            remoteShellCmd = self._loginCommand + ["python", agentRemoteFilename]471        try:472            self._sdbShell = subprocess.Popen(remoteShellCmd,473                                              shell=False,474                                              stdin=subprocess.PIPE,475                                              stdout=subprocess.PIPE,476                                              stderr=subprocess.PIPE,477                                              close_fds=True)478        except OSError, msg:479            raise TizenConnectionError('Executing "%s" failed.' % (480                " ".join(remoteShellCmd),))481        _g_sdbProcesses.add(self._sdbShell)482        self._sdbShellErrQueue = Queue.Queue()483        thread.start_new_thread(_fileToQueue, (self._sdbShell.stderr, self._sdbShellErrQueue))484        if self._useSdb:485            self._sdbShell.stdin.write("\r")486            try:487                ok, self._platformInfo = self._agentCmd("python %s; exit" % (agentRemoteFilename,))488            except IOError:489                raise TizenConnectionError('Connecting to a Tizen device/emulator with "sdb -s %s shell" failed.' % (self._serialNumber,))490        else: # using SSH491            ok, self._platformInfo = self._agentCmd("")492            pass # agent already started by remoteShellCmd493        return ok494    def reportErrorsInQueue(self):495        while True:496            try: l = self._sdbShellErrQueue.get_nowait()497            except Queue.Empty: return498            if self._debugAgentFile: self._debugAgentFile.write("<2 %s" % (l,))499            _adapterLog("fmbttizen agent error: %s" % (l,))500    def close(self):501        if self._sdbShell != None:502            try: self._agentCmd("quit", retry=0)503            except: pass504            try: self._sdbShell.terminate()505            except: pass506            try: self._sdbShell.stdin.close()507            except: pass508            try: self._sdbShell.stdout.close()509            except: pass510            try: self._sdbShell.stderr.close()511            except: pass512            self.reportErrorsInQueue()513            _g_sdbProcesses.remove(self._sdbShell)514        self._sdbShell = None515    def _agentAnswer(self):516        errorLinePrefix = "FMBTAGENT ERROR "517        okLinePrefix = "FMBTAGENT OK "518        l = self._sdbShell.stdout.readline()519        output = []520        while True:521            if self._debugAgentFile:522                if len(l) > 72: self._debugAgentFile.write("<1 %s...\n" % (l[:72],))523                else: self._debugAgentFile.write("<1 %s\n" % (l,))524            if l.startswith(okLinePrefix):525                return True, _decode(l[len(okLinePrefix):])526            elif l.startswith(errorLinePrefix):527                return False, _decode(l[len(errorLinePrefix):])528            else:529                output.append(l)530                pass531            l = self._sdbShell.stdout.readline()532            if l == "":533                raise IOError("Unexpected termination of sdb shell: %s" % ("\n".join(output)))534            l = l.strip()535    def _agentCmd(self, command, retry=3):536        if command[:2] in ["tt", "td", "tm", "tu", "er"]:537            # Operating on coordinates on with a touch devices538            # may require information on screen resolution.539            # The agent does not know about possible rotation, so540            # the resolution needs to be sent from here.541            if self._agentNeedsResolution:542                self._agentCmd("sd %s %s" % self._gti.screenSize())543                self._agentNeedsResolution = False544        if self._sdbShell == None: return False, "disconnected"545        if self._debugAgentFile: self._debugAgentFile.write(">0 %s\n" % (command,))546        if self._useSdb:547            eol = "\r"548        else:549            eol = "\n"550        try:551            if len(command) > 0:552                self._sdbShell.stdin.write("%s%s" % (command, eol))553                self._sdbShell.stdin.flush()554        except IOError, msg:555            if retry > 0:556                time.sleep(.2)557                self.reportErrorsInQueue()558                _adapterLog('Error when sending command "%s": %s.' % (command, msg))559                self.open()560                self._agentCmd(command, retry=retry-1)561            else:562                raise563        return self._agentAnswer()564    def sendPress(self, keyName):565        return self._agentCmd("kp %s" % (keyName,))[0]566    def sendKeyDown(self, keyName):567        return self._agentCmd("kd %s" % (keyName,))[0]568    def sendKeyUp(self, keyName):569        return self._agentCmd("ku %s" % (keyName,))[0]570    def sendMtLinearGesture(self, *args):571        return self._agentCmd("ml %s" % (_encode(args)))[0]572    def sendScreenshotRotation(self, angle):573        return self._agentCmd("sa %s" % (angle,))[0]574    def sendTap(self, x, y):575        return self._agentCmd("tt %s %s 1" % (x, y))[0]576    def sendTouchDown(self, x, y):577        return self._agentCmd("td %s %s 1" % (x, y))[0]578    def sendTouchMove(self, x, y):579        return self._agentCmd("tm %s %s" % (x, y))[0]580    def sendTouchUp(self, x, y):581        return self._agentCmd("tu %s %s 1" % (x, y))[0]582    def sendType(self, string):583        return self._agentCmd("kt %s" % (_encode(string)))[0]584    def sendDisplayBacklightTime(self, timeout):585        return self._agentCmd("bl %s" % (timeout,))[0]586    def sendRecStart(self, devices=[]):587        """Start recording events"""588        filterOpts = {589            "type": ["EV_SYN", "EV_KEY", "EV_REL", "EV_ABS"]590            }591        if devices:592            filterOpts["device"] = devices593        return self._agentCmd("er start %s" % (_encode(filterOpts,)))[0]594    def sendRecStop(self):595        """Stop recording events"""596        return self._agentCmd("er stop")[0]597    def recvRec(self):598        """Receive recorded events"""599        status, events = self._agentCmd("er fetch")600        if status:601            return events602        else:603            return None604    def recvDisplayStatus(self):605        status = self._agentCmd("gd")606        if status[0] == False:607            raise FMBTTizenError("Error reading display status '%s'" % (status[2],))608        return status[1]609    def recvScreenshot(self, filename, blankFrameRetry=3):610        if blankFrameRetry > 2:611            rv, img = self._agentCmd("ss")612        else:613            rv, img = self._agentCmd("ss R") # retry614        if rv == False:615            return False616        if img.startswith("FMBTRAWX11"):617            try:618                header, zdata = img.split('\n', 1)619                width, height, depth, bpp = [int(n) for n in header.split()[1:]]620                data = zlib.decompress(zdata)621            except Exception, e:622                raise TizenConnectionError("Corrupted screenshot data: %s" % (e,))623            if len(data) != width * height * 4:624                raise FMBTTizenError("Image data size mismatch.")625            if fmbtgti.eye4graphics.bgrx2rgb(data, width, height) == 0 and blankFrameRetry > 0:626                time.sleep(0.5)627                return self.recvScreenshot(filename, blankFrameRetry - 1)628            # TODO: use libimagemagick directly to save data to png?629            ppm_header = "P6\n%d %d\n%d\n" % (width, height, 255)630            f = file(filename + ".ppm", "w").write(ppm_header + data[:width*height*3])631            _run(["convert", filename + ".ppm", filename], expectedExitStatus=0)632            os.remove("%s.ppm" % (filename,))633        else:634            file(filename, "w").write(img)635        return True636    def recvSerialNumber(self):637        s, o = commands.getstatusoutput("sdb get-serialno")638        return o.splitlines()[-1]639    def recvInputDevices(self):640        return self._platformInfo["input devices"]641    def shellSOE(self, shellCommand, username, password, asyncStatus, asyncOut, asyncError):642        _, (s, o, e) = self._agentCmd(643            "es %s" % (_encode((shellCommand, username, password, asyncStatus,644                                asyncOut, asyncError)),))645        return s, o, e646    def target(self):647        return self._serialNumber648class FMBTTizenError(Exception): pass649class TizenConnectionError(FMBTTizenError): pass650class TizenDeviceNotFoundError(TizenConnectionError): passLearn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
