Best Python code snippet using fMBT_python
fmbttizen-agent.py
Source:fmbttizen-agent.py  
...571def takeScreenshotOnWeston():572    if iAmRoot:573        rv, status = westonTakeScreenshotRoot()574    else:575        rv, status = subAgentCommand("root", "tizen", "ss weston-root")576    if rv == False:577        return rv, status578    return True, file("/tmp/screenshot.png").read()579def fuser(filename, usualSuspects=None):580    """Returns the pid of a user of given file, or None"""581    filepath = os.path.realpath(filename)582    if not os.access(filepath, os.R_OK):583        raise ValueError('No such file: "%s"' % (filename,))584    if usualSuspects == None:585        procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")586    else:587        procFds = []588        for pid in usualSuspects:589            procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))590    for symlink in procFds:591        try:592            if os.path.realpath(symlink) == filepath:593                return int(symlink.split('/')[2])594        except OSError:595            pass596def findWestonScreenshotFilenameRoot():597    # find weston cwd598    for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):599        try:600            if os.path.realpath(exe) == "/usr/bin/weston":601                cwd = os.path.realpath(os.path.dirname(exe) + "/cwd")602                break603        except OSError:604            pass605    else:606        return False, "cannot find weston cwd"607    rv = cwd + "/wayland-screenshot.png"608    return rv609if g_Xavailable:610    takeScreenshot = takeScreenshotOnX611else:612    takeScreenshot = takeScreenshotOnWeston613def shellSOE(command, asyncStatus, asyncOut, asyncError):614    if (asyncStatus, asyncOut, asyncError) != (None, None, None):615        # prepare for decoupled asynchronous execution616        if asyncStatus == None: asyncStatus = "/dev/null"617        if asyncOut == None: asyncOut = "/dev/null"618        if asyncError == None: asyncError = "/dev/null"619        try:620            stdinFile = file("/dev/null", "r")621            stdoutFile = file(asyncOut, "a+")622            stderrFile = file(asyncError, "a+", 0)623            statusFile = file(asyncStatus, "a+")624        except IOError, e:625            return False, (None, None, e)626        try:627            if os.fork() > 0:628                # parent returns after successful fork, there no629                # direct visibility to async child process beyond this630                # point.631                stdinFile.close()632                stdoutFile.close()633                stderrFile.close()634                statusFile.close()635                return True, (0, None, None)636        except OSError, e:637            return False, (None, None, e)638        os.setsid()639    else:640        stdinFile = subprocess.PIPE641        stdoutFile = subprocess.PIPE642        stderrFile = subprocess.PIPE643    try:644        p = subprocess.Popen(command, shell=True,645                             stdin=stdinFile,646                             stdout=stdoutFile,647                             stderr=stderrFile,648                             close_fds=True)649    except Exception, e:650        return False, (None, None, e)651    if asyncStatus == None and asyncOut == None and asyncError == None:652        # synchronous execution, read stdout and stderr653        out, err = p.communicate()654    else:655        # asynchronous execution, store status to file656        statusFile.write(str(p.wait()) + "\n")657        statusFile.close()658        out, err = None, None659    return True, (p.returncode, out, err)660def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):661    start = time.time()662    endTime = start + timeout663    s = ""664    try: s += nonblockingFd.read()665    except IOError: pass666    foundOutputs = [ao for ao in acceptedOutputs if ao in s]667    while len(foundOutputs) == 0 and time.time() < endTime:668        time.sleep(pollInterval)669        try: s += nonblockingFd.read()670        except IOError: pass671        foundOutputs = [ao for ao in acceptedOutputs if ao in s]672    return foundOutputs, s673_subAgents = {}674def openSubAgent(username, password):675    p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),676            shell=True,677            stdin=subprocess.PIPE,678            stdout=subprocess.PIPE,679            stderr=subprocess.PIPE)680    # Read in non-blocking mode to ensure agent starts correctly681    fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)682    fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)683    output2 = ""684    seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)685    if "Password:" in seenPrompts:686        p.stdin.write(password + "\r")687        output1 = ""688        seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)689    if not "FMBTAGENT" in seenPrompts:690        p.terminate()691        return (None, 'fMBT agent with username "%s" does not answer.' % (username,),692                output1 + output2)693    # Agent is alive, continue in blocking mode694    fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)695    return p, "", ""696def subAgentCommand(username, password, cmd):697    if not username in _subAgents:698        process, output, error = openSubAgent(username, password)699        if process == None:700            return None, (-1, output, error)701        else:702            _subAgents[username] = process703    p = _subAgents[username]704    p.stdin.write(cmd + "\r")705    p.stdin.flush()706    answer = p.stdout.readline().rstrip()707    if answer.startswith("FMBTAGENT OK "):708        return True, _decode(answer[len("FMBTAGENT OK "):])709    else:710        return False, _decode(answer[len("FMBTAGENT ERROR "):])711def closeSubAgents():712    for username in _subAgents:713        subAgentCommand(username, None, "quit")714if __name__ == "__main__":715    try:716        origTermAttrs = termios.tcgetattr(sys.stdin.fileno())717        hasTerminal = True718    except termios.error:719        origTermAttrs = None720        hasTerminal = False721    if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:722        # Disable terminal echo723        newTermAttrs = origTermAttrs724        newTermAttrs[3] = origTermAttrs[3] &  ~termios.ECHO725        termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)726    if "--no-x" in sys.argv:727        debug("X disabled")728        g_Xavailable = False729    platformInfo = {}730    platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()731    # Send version number, enter main loop732    write_response(True, platformInfo)733    cmd = read_cmd()734    while cmd:735        if cmd.startswith("bl "): # set display backlight time736            if iAmRoot:737                timeout = int(cmd[3:].strip())738                try:739                    file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))740                    write_response(True, None)741                except Exception, e: write_response(False, e)742            else:743                write_response(*subAgentCommand("root", "tizen", cmd))744        elif cmd.startswith("er "): # event recorder745            if iAmRoot:746                cmd, arg = cmd.split(" ", 1)747                if arg.startswith("start "):748                    filterOpts = _decode(arg.split()[1])749                    if touch_device:750                        filterOpts["touchScreen"] = touch_device751                    fmbtuinput.startQueueingEvents(filterOpts)752                    write_response(True, None)753                elif arg == "stop":754                    events = fmbtuinput.stopQueueingEvents()755                    write_response(True, None)756                elif arg == "fetch":757                    events = fmbtuinput.fetchQueuedEvents()758                    write_response(True, events)759            else:760                write_response(*subAgentCommand("root", "tizen", cmd))761        elif cmd.startswith("gd"):   # get display status762            try:763                p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)764                out, err = p.communicate()765                if "Monitor is Off" in out: write_response(True, "Off")766                elif "Monitor is On" in out: write_response(True, "On")767                else: write_response(False, err)768            except Exception, e: write_response(False, e)769        elif cmd.startswith("tm "):   # touch move(x, y)770            xs, ys = cmd[3:].strip().split()771            if g_Xavailable:772                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)773                libX11.XFlush(display)774            else:775                if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))776                else: rv, msg = subAgentCommand("root", "tizen", cmd)777            write_response(True, None)778        elif cmd.startswith("tt "): # touch tap(x, y, button)779            x, y, button = [int(i) for i in cmd[3:].strip().split()]780            if g_Xavailable:781                libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)782                libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)783                libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)784                libX11.XFlush(display)785                rv, msg = True, None786            else:787                if iAmRoot: rv, msg = sendHwTap(x, y, button-1)788                else: rv, msg = subAgentCommand("root", "tizen", cmd)789            write_response(rv, msg)790        elif cmd.startswith("td "): # touch down(x, y, button)791            xs, ys, button = cmd[3:].strip().split()792            button = int(button)793            if g_Xavailable:794                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)795                libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)796                libX11.XFlush(display)797            else:798                if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)799                else: rv, msg = subAgentCommand("root", "tizen", cmd)800            write_response(True, None)801        elif cmd.startswith("tu "): # touch up(x, y, button)802            xs, ys, button = cmd[3:].strip().split()803            button = int(button)804            if g_Xavailable:805                libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)806                libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)807                libX11.XFlush(display)808            else:809                if iAmRoot: rv, msg = sendHwFingerUp(int(xs), int(ys), button-1)810                else: rv, msg = subAgentCommand("root", "tizen", cmd)811            write_response(True, None)812        elif cmd.startswith("kd "): # hw key down813            if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, -1)814            else: rv, msg = subAgentCommand("root", "tizen", cmd)815            write_response(rv, msg)816        elif cmd.startswith("kp "): # hw key press817            if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, 0)818            else: rv, msg = subAgentCommand("root", "tizen", cmd)819            write_response(rv, msg)820        elif cmd.startswith("ku "): # hw key up821            if iAmRoot: rv, msg = sendHwKey(cmd[3:], -1, 0)822            else: rv, msg = subAgentCommand("root", "tizen", cmd)823            write_response(rv, msg)824        elif cmd.startswith("kt "): # send x events825            if g_Xavailable:826                rv, skippedSymbols = typeSequence(_decode(cmd[3:]))827                libX11.XFlush(display)828            elif iAmRoot:829                rv, skippedSymbols = typeSequence(_decode(cmd[3:]),830                                                  delayBetweenChars=0.05)831            else:832                rv, skippedSymbols = subAgentCommand("root", "tizen", cmd)833            write_response(rv, skippedSymbols)834        elif cmd.startswith("ml "): # send multitouch linear gesture835            if iAmRoot:836                rv, _ = mtLinearGesture(*_decode(cmd[3:]))837            else:838                rv, _ = subAgentCommand("root", "tizen", cmd)839            write_response(rv, _)840        elif cmd.startswith("ss"): # save screenshot841            if "R" in cmd.split() and g_Xavailable:842                resetXConnection()843            if "weston-root" in cmd.split(): # do Weston root part only844                write_response(*westonTakeScreenshotRoot())845            else:846                rv, compressedImage = takeScreenshot()847                write_response(rv, compressedImage)848        elif cmd.startswith("sd "): # set screen dimensions (width and height)849            _sw, _sh = cmd[3:].split()850            screenWidth, screenHeight = int(_sw), int(_sh)851            if iAmRoot:852                if touch_device:853                    touch_device.setScreenSize((screenWidth, screenHeight))854                    rv, msg = True, None855                else:856                    rv, msg = True, "no touch device"857            else:858                rv, msg = subAgentCommand("root", "tizen", cmd)859            write_response(rv, msg)860        elif cmd.startswith("sa "): # set screenshot rotation angle (degrees)861            if iAmRoot:862                if touch_device:863                    _sa = int(cmd[3:])864                    # compensate it with opposite rotation865                    touch_device.setScreenAngle(-_sa)866                    rv, msg = True, None867                else:868                    rv, msg = True, "no touch device"869            else:870                rv, msg = subAgentCommand("root", "tizen", cmd)871            write_response(rv, msg)872        elif cmd.startswith("es "): # execute shell873            shellCmd, username, password, asyncStatus, asyncOut, asyncError = _decode(cmd[3:])874            if username == "":875                rv, soe = shellSOE(shellCmd, asyncStatus, asyncOut, asyncError)876            else:877                rv, soe = subAgentCommand(username, password,878                    "es " + _encode((shellCmd, "", "", asyncStatus, asyncOut, asyncError)))879            write_response(rv, soe)880        elif cmd.startswith("quit"): # quit881            write_response(rv, True)882            break883        else:884            write_response(False, 'Unknown command: "%s"' % (cmd,))885        cmd = read_cmd()886    closeSubAgents()887    if g_Xavailable:888        libX11.XCloseDisplay(display)889    if hasTerminal:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
