Best Python code snippet using fMBT_python
fmbttizen-agent.py
Source:fmbttizen-agent.py  
...668    rawfmbt_data = ctypes.string_at(image.data, image.height * image.bytes_per_line)669    compressed_image = rawfmbt_header + zlib.compress(rawfmbt_data, 3)670    libX11.XDestroyImage(image_p)671    return True, compressed_image672def westonTakeScreenshotRoot(retry=2):673    if westonTakeScreenshotRoot.ssFilenames == None:674        westonTakeScreenshotRoot.ssFilenames = findWestonScreenshotFilenames()675    if not westonTakeScreenshotRoot.ssFilenames:676        return False, "cannot find weston screenshot directory"677    try:678        for ssFilename in westonTakeScreenshotRoot.ssFilenames:679            if os.access(ssFilename, os.R_OK):680                os.remove(ssFilename)681        keyboard_device.press("KEY_LEFTMETA")682        keyboard_device.tap("s")683        keyboard_device.release("KEY_LEFTMETA")684        time.sleep(0.5)685        # find which screenshot file got created?686        for ssFilename in westonTakeScreenshotRoot.ssFilenames:687            if os.access(ssFilename, os.R_OK):688                break689        else:690            if retry > 0:691                return westonTakeScreenshotRoot(retry-1)692            else:693                return False, "weston did not create any of files %s" % (694                    westonTakeScreenshotRoot.ssFilenames,)695        # wait for the screenshot writer to finish696        writerPid = fuser(ssFilename)697        if writerPid != None:698            time.sleep(0.1)699            while fuser(ssFilename, [writerPid]) != None:700                time.sleep(0.1)701        shutil.move(ssFilename, "/tmp/screenshot.png")702        os.chmod("/tmp/screenshot.png", 0666)703    except Exception, e:704        return False, str(e)705    return True, None706westonTakeScreenshotRoot.ssFilenames = None707def takeScreenshotOnWeston():708    if iAmRoot:709        rv, status = westonTakeScreenshotRoot()710    else:711        rv, status = subAgentCommand("root", "tizen", "ss weston-root")712    if rv == False:713        return rv, status714    return True, file("/tmp/screenshot.png").read()715def fuser(filename, usualSuspects=None):716    """Returns the pid of a user of given file, or None"""717    filepath = os.path.realpath(filename)718    if not os.access(filepath, os.R_OK):719        raise ValueError('No such file: "%s"' % (filename,))720    if usualSuspects == None:721        procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")722    else:723        procFds = []724        for pid in usualSuspects:725            procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))726    for symlink in procFds:727        try:728            if os.path.realpath(symlink) == filepath:729                return int(symlink.split('/')[2])730        except OSError:731            pass732def findWestonScreenshotFilenames():733    # find weston cwd734    dirs = []735    for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):736        try:737            if os.path.realpath(exe) == "/usr/bin/weston":738                dirs.append(os.path.realpath(os.path.dirname(exe) + "/cwd"))739        except OSError:740            pass741    return [d + "/wayland-screenshot.png" for d in sorted(set(dirs))]742if g_Xavailable:743    takeScreenshot = takeScreenshotOnX744else:745    takeScreenshot = takeScreenshotOnWeston746def shellSOE(command, asyncStatus, asyncOut, asyncError, usePty):747    if usePty:748        command = '''python -c "import pty; pty.spawn(%s)" ''' % (repr(shlex.split(command)),),749    if (asyncStatus, asyncOut, asyncError) != (None, None, None):750        # prepare for decoupled asynchronous execution751        if asyncStatus == None: asyncStatus = "/dev/null"752        if asyncOut == None: asyncOut = "/dev/null"753        if asyncError == None: asyncError = "/dev/null"754        try:755            stdinFile = file("/dev/null", "r")756            stdoutFile = file(asyncOut, "a+")757            stderrFile = file(asyncError, "a+", 0)758            statusFile = file(asyncStatus, "a+")759        except IOError, e:760            return False, (None, None, e)761        try:762            if os.fork() > 0:763                # parent returns after successful fork, there no764                # direct visibility to async child process beyond this765                # point.766                stdinFile.close()767                stdoutFile.close()768                stderrFile.close()769                statusFile.close()770                return True, (0, None, None) # async parent finishes here771        except OSError, e:772            return False, (None, None, e)773        os.setsid()774    else:775        stdinFile = subprocess.PIPE776        stdoutFile = subprocess.PIPE777        stderrFile = subprocess.PIPE778    try:779        p = subprocess.Popen(command, shell=True,780                             stdin=stdinFile,781                             stdout=stdoutFile,782                             stderr=stderrFile,783                             close_fds=True)784    except Exception, e:785        return False, (None, None, e)786    if asyncStatus == None and asyncOut == None and asyncError == None:787        # synchronous execution, read stdout and stderr788        out, err = p.communicate()789    else:790        # asynchronous execution, store status to file791        statusFile.write(str(p.wait()) + "\n")792        statusFile.close()793        out, err = None, None794        sys.exit(0) # async child finishes here795    return True, (p.returncode, out, err)796def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):797    start = time.time()798    endTime = start + timeout799    s = ""800    try: s += nonblockingFd.read()801    except IOError: pass802    foundOutputs = [ao for ao in acceptedOutputs if ao in s]803    while len(foundOutputs) == 0 and time.time() < endTime:804        time.sleep(pollInterval)805        try: s += nonblockingFd.read()806        except IOError: pass807        foundOutputs = [ao for ao in acceptedOutputs if ao in s]808    return foundOutputs, s809_subAgents = {}810def openSubAgent(username, password):811    p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),812            shell=True,813            stdin=subprocess.PIPE,814            stdout=subprocess.PIPE,815            stderr=subprocess.PIPE)816    # Read in non-blocking mode to ensure agent starts correctly817    fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)818    fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)819    output2 = ""820    seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)821    if "Password:" in seenPrompts:822        p.stdin.write(password + "\r")823        output1 = ""824        seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)825    if not "FMBTAGENT" in seenPrompts:826        p.terminate()827        return (None, 'fMBT agent with username "%s" does not answer.' % (username,),828                output1 + output2)829    # Agent is alive, continue in blocking mode830    fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)831    return p, "", ""832def subAgentCommand(username, password, cmd):833    if not username in _subAgents:834        process, output, error = openSubAgent(username, password)835        if process == None:836            return None, (-1, output, error)837        else:838            _subAgents[username] = process839    p = _subAgents[username]840    p.stdin.write(cmd + "\r")841    p.stdin.flush()842    answer = p.stdout.readline().rstrip()843    if answer.startswith("FMBTAGENT OK "):844        return True, _decode(answer[len("FMBTAGENT OK "):])845    else:846        return False, _decode(answer[len("FMBTAGENT ERROR "):])847def closeSubAgents():848    for username in _subAgents:849        subAgentCommand(username, None, "quit")850if __name__ == "__main__":851    try:852        origTermAttrs = termios.tcgetattr(sys.stdin.fileno())853        hasTerminal = True854    except termios.error:855        origTermAttrs = None856        hasTerminal = False857    if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:858        # Disable terminal echo859        newTermAttrs = origTermAttrs860        newTermAttrs[3] = origTermAttrs[3] &  ~termios.ECHO861        termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)862    if "--no-x" in sys.argv:863        debug("X disabled")864        g_Xavailable = False865    platformInfo = {}866    platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()867    # Send version number, enter main loop868    write_response(True, platformInfo)869    cmd = read_cmd()870    while cmd:871        if cmd.startswith("bl "): # set display backlight time872            if iAmRoot:873                timeout = int(cmd[3:].strip())874                try:875                    file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))876                    write_response(True, None)877                except Exception, e: write_response(False, e)878            else:879                write_response(*subAgentCommand("root", "tizen", cmd))880        elif cmd.startswith("er "): # event recorder881            if iAmRoot:882                cmd, arg = cmd.split(" ", 1)883                if arg.startswith("start "):884                    filterOpts = _decode(arg.split()[1])885                    if touch_device:886                        filterOpts["touchScreen"] = touch_device887                    fmbtuinput.startQueueingEvents(filterOpts)888                    write_response(True, None)889                elif arg == "stop":890                    events = fmbtuinput.stopQueueingEvents()891                    write_response(True, None)892                elif arg == "fetch":893                    events = fmbtuinput.fetchQueuedEvents()894                    write_response(True, events)895            else:896                write_response(*subAgentCommand("root", "tizen", cmd))897        elif cmd.startswith("gd"):   # get display status898            try:899                p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)900                out, err = p.communicate()901                if "Monitor is Off" in out: write_response(True, "Off")902                elif "Monitor is On" in out: write_response(True, "On")903                else: write_response(False, err)904            except Exception, e: write_response(False, e)905        elif cmd.startswith("tm "):   # touch move(x, y)906            xs, ys = cmd[3:].strip().split()907            if g_Xavailable:908                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)909                libX11.XFlush(display)910            else:911                if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))912                else: rv, msg = subAgentCommand("root", "tizen", cmd)913            write_response(True, None)914        elif cmd.startswith("tr "):   # relative move(x, y)915            xd, yd = cmd[3:].strip().split()916            if iAmRoot: rv, msg = sendRelMove(int(xd), int(yd))917            else: rv, msg = subAgentCommand("root", "tizen", cmd)918            write_response(True, None)919        elif cmd.startswith("tt "): # touch tap(x, y, button)920            x, y, button = [int(i) for i in cmd[3:].strip().split()]921            if g_Xavailable:922                libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)923                libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)924                libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)925                libX11.XFlush(display)926                rv, msg = True, None927            else:928                if iAmRoot: rv, msg = sendHwTap(x, y, button-1)929                else: rv, msg = subAgentCommand("root", "tizen", cmd)930            write_response(rv, msg)931        elif cmd.startswith("td "): # touch down(x, y, button)932            xs, ys, button = cmd[3:].strip().split()933            button = int(button)934            if g_Xavailable:935                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)936                libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)937                libX11.XFlush(display)938            else:939                if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)940                else: rv, msg = subAgentCommand("root", "tizen", cmd)941            write_response(True, None)942        elif cmd.startswith("tu "): # touch up(x, y, button)943            xs, ys, button = cmd[3:].strip().split()944            button = int(button)945            if g_Xavailable:946                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)947                libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)948                libX11.XFlush(display)949            else:950                if iAmRoot: rv, msg = sendHwFingerUp(int(xs), int(ys), button-1)951                else: rv, msg = subAgentCommand("root", "tizen", cmd)952            write_response(True, None)953        elif cmd.startswith("kd "): # hw key down954            if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, -1)955            else: rv, msg = subAgentCommand("root", "tizen", cmd)956            write_response(rv, msg)957        elif cmd.startswith("kn"): # list input key names958            if "hwKeyDevice" in globals():959                hk = hwKeyDevice.keys()960            else:961                hk = []962            if "keyboard_device" in globals():963                ik = InputKeys964            else:965                ik = []966            write_response(True, sorted(ik + hk))967        elif cmd.startswith("kp "): # hw key press968            if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, 0)969            else: rv, msg = subAgentCommand("root", "tizen", cmd)970            write_response(rv, msg)971        elif cmd.startswith("ku "): # hw key up972            if iAmRoot: rv, msg = sendHwKey(cmd[3:], -1, 0)973            else: rv, msg = subAgentCommand("root", "tizen", cmd)974            write_response(rv, msg)975        elif cmd.startswith("kt "): # send x events976            if g_Xavailable:977                rv, skippedSymbols = typeSequence(_decode(cmd[3:]))978                libX11.XFlush(display)979            elif iAmRoot:980                rv, skippedSymbols = typeSequence(_decode(cmd[3:]),981                                                  delayBetweenChars=0.05)982            else:983                rv, skippedSymbols = subAgentCommand("root", "tizen", cmd)984            write_response(rv, skippedSymbols)985        elif cmd.startswith("ml "): # send multitouch linear gesture986            if iAmRoot:987                rv, _ = mtLinearGesture(*_decode(cmd[3:]))988            else:989                rv, _ = subAgentCommand("root", "tizen", cmd)990            write_response(rv, _)991        elif cmd.startswith("ss"): # save screenshot992            if "R" in cmd.split() and g_Xavailable:993                resetXConnection()994            if "weston-root" in cmd.split(): # do Weston root part only995                write_response(*westonTakeScreenshotRoot())996            else:997                rv, compressedImage = takeScreenshot()998                write_response(rv, compressedImage)999        elif cmd.startswith("sd "): # set screen dimensions (width and height)1000            _sw, _sh = cmd[3:].split()1001            screenWidth, screenHeight = int(_sw), int(_sh)1002            if iAmRoot:1003                if touch_device:1004                    touch_device.setScreenSize((screenWidth, screenHeight))1005                    rv, msg = True, None1006                else:1007                    rv, msg = True, "no touch device"1008            else:1009                rv, msg = subAgentCommand("root", "tizen", cmd)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
