How to use subAgentCommand method in fMBT

Best Python code snippet using fMBT_python

fmbttizen-agent.py

Source:fmbttizen-agent.py Github

copy

Full Screen

...571def takeScreenshotOnWeston():572 if iAmRoot:573 rv, status = westonTakeScreenshotRoot()574 else:575 rv, status = subAgentCommand("root", "tizen", "ss weston-root")576 if rv == False:577 return rv, status578 return True, file("/tmp/screenshot.png").read()579def fuser(filename, usualSuspects=None):580 """Returns the pid of a user of given file, or None"""581 filepath = os.path.realpath(filename)582 if not os.access(filepath, os.R_OK):583 raise ValueError('No such file: "%s"' % (filename,))584 if usualSuspects == None:585 procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")586 else:587 procFds = []588 for pid in usualSuspects:589 procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))590 for symlink in procFds:591 try:592 if os.path.realpath(symlink) == filepath:593 return int(symlink.split('/')[2])594 except OSError:595 pass596def findWestonScreenshotFilenameRoot():597 # find weston cwd598 for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):599 try:600 if os.path.realpath(exe) == "/usr/bin/weston":601 cwd = os.path.realpath(os.path.dirname(exe) + "/cwd")602 break603 except OSError:604 pass605 else:606 return False, "cannot find weston cwd"607 rv = cwd + "/wayland-screenshot.png"608 return rv609if g_Xavailable:610 takeScreenshot = takeScreenshotOnX611else:612 takeScreenshot = takeScreenshotOnWeston613def shellSOE(command, asyncStatus, asyncOut, asyncError):614 if (asyncStatus, asyncOut, asyncError) != (None, None, None):615 # prepare for decoupled asynchronous execution616 if asyncStatus == None: asyncStatus = "/dev/null"617 if asyncOut == None: asyncOut = "/dev/null"618 if asyncError == None: asyncError = "/dev/null"619 try:620 stdinFile = file("/dev/null", "r")621 stdoutFile = file(asyncOut, "a+")622 stderrFile = file(asyncError, "a+", 0)623 statusFile = file(asyncStatus, "a+")624 except IOError, e:625 return False, (None, None, e)626 try:627 if os.fork() > 0:628 # parent returns after successful fork, there no629 # direct visibility to async child process beyond this630 # point.631 stdinFile.close()632 stdoutFile.close()633 stderrFile.close()634 statusFile.close()635 return True, (0, None, None)636 except OSError, e:637 return False, (None, None, e)638 os.setsid()639 else:640 stdinFile = subprocess.PIPE641 stdoutFile = subprocess.PIPE642 stderrFile = subprocess.PIPE643 try:644 p = subprocess.Popen(command, shell=True,645 stdin=stdinFile,646 stdout=stdoutFile,647 stderr=stderrFile,648 close_fds=True)649 except Exception, e:650 return False, (None, None, e)651 if asyncStatus == None and asyncOut == None and asyncError == None:652 # synchronous execution, read stdout and stderr653 out, err = p.communicate()654 else:655 # asynchronous execution, store status to file656 statusFile.write(str(p.wait()) + "\n")657 statusFile.close()658 out, err = None, None659 return True, (p.returncode, out, err)660def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):661 start = time.time()662 endTime = start + timeout663 s = ""664 try: s += nonblockingFd.read()665 except IOError: pass666 foundOutputs = [ao for ao in acceptedOutputs if ao in s]667 while len(foundOutputs) == 0 and time.time() < endTime:668 time.sleep(pollInterval)669 try: s += nonblockingFd.read()670 except IOError: pass671 foundOutputs = [ao for ao in acceptedOutputs if ao in s]672 return foundOutputs, s673_subAgents = {}674def openSubAgent(username, password):675 p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),676 shell=True,677 stdin=subprocess.PIPE,678 stdout=subprocess.PIPE,679 stderr=subprocess.PIPE)680 # Read in non-blocking mode to ensure agent starts correctly681 fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)682 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)683 output2 = ""684 seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)685 if "Password:" in seenPrompts:686 p.stdin.write(password + "\r")687 output1 = ""688 seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)689 if not "FMBTAGENT" in seenPrompts:690 p.terminate()691 return (None, 'fMBT agent with username "%s" does not answer.' % (username,),692 output1 + output2)693 # Agent is alive, continue in blocking mode694 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)695 return p, "", ""696def subAgentCommand(username, password, cmd):697 if not username in _subAgents:698 process, output, error = openSubAgent(username, password)699 if process == None:700 return None, (-1, output, error)701 else:702 _subAgents[username] = process703 p = _subAgents[username]704 p.stdin.write(cmd + "\r")705 p.stdin.flush()706 answer = p.stdout.readline().rstrip()707 if answer.startswith("FMBTAGENT OK "):708 return True, _decode(answer[len("FMBTAGENT OK "):])709 else:710 return False, _decode(answer[len("FMBTAGENT ERROR "):])711def closeSubAgents():712 for username in _subAgents:713 subAgentCommand(username, None, "quit")714if __name__ == "__main__":715 try:716 origTermAttrs = termios.tcgetattr(sys.stdin.fileno())717 hasTerminal = True718 except termios.error:719 origTermAttrs = None720 hasTerminal = False721 if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:722 # Disable terminal echo723 newTermAttrs = origTermAttrs724 newTermAttrs[3] = origTermAttrs[3] & ~termios.ECHO725 termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)726 if "--no-x" in sys.argv:727 debug("X disabled")728 g_Xavailable = False729 platformInfo = {}730 platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()731 # Send version number, enter main loop732 write_response(True, platformInfo)733 cmd = read_cmd()734 while cmd:735 if cmd.startswith("bl "): # set display backlight time736 if iAmRoot:737 timeout = int(cmd[3:].strip())738 try:739 file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))740 write_response(True, None)741 except Exception, e: write_response(False, e)742 else:743 write_response(*subAgentCommand("root", "tizen", cmd))744 elif cmd.startswith("er "): # event recorder745 if iAmRoot:746 cmd, arg = cmd.split(" ", 1)747 if arg.startswith("start "):748 filterOpts = _decode(arg.split()[1])749 if touch_device:750 filterOpts["touchScreen"] = touch_device751 fmbtuinput.startQueueingEvents(filterOpts)752 write_response(True, None)753 elif arg == "stop":754 events = fmbtuinput.stopQueueingEvents()755 write_response(True, None)756 elif arg == "fetch":757 events = fmbtuinput.fetchQueuedEvents()758 write_response(True, events)759 else:760 write_response(*subAgentCommand("root", "tizen", cmd))761 elif cmd.startswith("gd"): # get display status762 try:763 p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)764 out, err = p.communicate()765 if "Monitor is Off" in out: write_response(True, "Off")766 elif "Monitor is On" in out: write_response(True, "On")767 else: write_response(False, err)768 except Exception, e: write_response(False, e)769 elif cmd.startswith("tm "): # touch move(x, y)770 xs, ys = cmd[3:].strip().split()771 if g_Xavailable:772 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)773 libX11.XFlush(display)774 else:775 if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))776 else: rv, msg = subAgentCommand("root", "tizen", cmd)777 write_response(True, None)778 elif cmd.startswith("tt "): # touch tap(x, y, button)779 x, y, button = [int(i) for i in cmd[3:].strip().split()]780 if g_Xavailable:781 libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)782 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)783 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)784 libX11.XFlush(display)785 rv, msg = True, None786 else:787 if iAmRoot: rv, msg = sendHwTap(x, y, button-1)788 else: rv, msg = subAgentCommand("root", "tizen", cmd)789 write_response(rv, msg)790 elif cmd.startswith("td "): # touch down(x, y, button)791 xs, ys, button = cmd[3:].strip().split()792 button = int(button)793 if g_Xavailable:794 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)795 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)796 libX11.XFlush(display)797 else:798 if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)799 else: rv, msg = subAgentCommand("root", "tizen", cmd)800 write_response(True, None)801 elif cmd.startswith("tu "): # touch up(x, y, button)802 xs, ys, button = cmd[3:].strip().split()803 button = int(button)804 if g_Xavailable:805 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)806 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)807 libX11.XFlush(display)808 else:809 if iAmRoot: rv, msg = sendHwFingerUp(int(xs), int(ys), button-1)810 else: rv, msg = subAgentCommand("root", "tizen", cmd)811 write_response(True, None)812 elif cmd.startswith("kd "): # hw key down813 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, -1)814 else: rv, msg = subAgentCommand("root", "tizen", cmd)815 write_response(rv, msg)816 elif cmd.startswith("kp "): # hw key press817 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, 0)818 else: rv, msg = subAgentCommand("root", "tizen", cmd)819 write_response(rv, msg)820 elif cmd.startswith("ku "): # hw key up821 if iAmRoot: rv, msg = sendHwKey(cmd[3:], -1, 0)822 else: rv, msg = subAgentCommand("root", "tizen", cmd)823 write_response(rv, msg)824 elif cmd.startswith("kt "): # send x events825 if g_Xavailable:826 rv, skippedSymbols = typeSequence(_decode(cmd[3:]))827 libX11.XFlush(display)828 elif iAmRoot:829 rv, skippedSymbols = typeSequence(_decode(cmd[3:]),830 delayBetweenChars=0.05)831 else:832 rv, skippedSymbols = subAgentCommand("root", "tizen", cmd)833 write_response(rv, skippedSymbols)834 elif cmd.startswith("ml "): # send multitouch linear gesture835 if iAmRoot:836 rv, _ = mtLinearGesture(*_decode(cmd[3:]))837 else:838 rv, _ = subAgentCommand("root", "tizen", cmd)839 write_response(rv, _)840 elif cmd.startswith("ss"): # save screenshot841 if "R" in cmd.split() and g_Xavailable:842 resetXConnection()843 if "weston-root" in cmd.split(): # do Weston root part only844 write_response(*westonTakeScreenshotRoot())845 else:846 rv, compressedImage = takeScreenshot()847 write_response(rv, compressedImage)848 elif cmd.startswith("sd "): # set screen dimensions (width and height)849 _sw, _sh = cmd[3:].split()850 screenWidth, screenHeight = int(_sw), int(_sh)851 if iAmRoot:852 if touch_device:853 touch_device.setScreenSize((screenWidth, screenHeight))854 rv, msg = True, None855 else:856 rv, msg = True, "no touch device"857 else:858 rv, msg = subAgentCommand("root", "tizen", cmd)859 write_response(rv, msg)860 elif cmd.startswith("sa "): # set screenshot rotation angle (degrees)861 if iAmRoot:862 if touch_device:863 _sa = int(cmd[3:])864 # compensate it with opposite rotation865 touch_device.setScreenAngle(-_sa)866 rv, msg = True, None867 else:868 rv, msg = True, "no touch device"869 else:870 rv, msg = subAgentCommand("root", "tizen", cmd)871 write_response(rv, msg)872 elif cmd.startswith("es "): # execute shell873 shellCmd, username, password, asyncStatus, asyncOut, asyncError = _decode(cmd[3:])874 if username == "":875 rv, soe = shellSOE(shellCmd, asyncStatus, asyncOut, asyncError)876 else:877 rv, soe = subAgentCommand(username, password,878 "es " + _encode((shellCmd, "", "", asyncStatus, asyncOut, asyncError)))879 write_response(rv, soe)880 elif cmd.startswith("quit"): # quit881 write_response(rv, True)882 break883 else:884 write_response(False, 'Unknown command: "%s"' % (cmd,))885 cmd = read_cmd()886 closeSubAgents()887 if g_Xavailable:888 libX11.XCloseDisplay(display)889 if hasTerminal:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful