How to use westonTakeScreenshotRoot method in fMBT

Best Python code snippet using fMBT_python

fmbttizen-agent.py

Source:fmbttizen-agent.py Github

copy

Full Screen

...668 rawfmbt_data = ctypes.string_at(image.data, image.height * image.bytes_per_line)669 compressed_image = rawfmbt_header + zlib.compress(rawfmbt_data, 3)670 libX11.XDestroyImage(image_p)671 return True, compressed_image672def westonTakeScreenshotRoot(retry=2):673 if westonTakeScreenshotRoot.ssFilenames == None:674 westonTakeScreenshotRoot.ssFilenames = findWestonScreenshotFilenames()675 if not westonTakeScreenshotRoot.ssFilenames:676 return False, "cannot find weston screenshot directory"677 try:678 for ssFilename in westonTakeScreenshotRoot.ssFilenames:679 if os.access(ssFilename, os.R_OK):680 os.remove(ssFilename)681 keyboard_device.press("KEY_LEFTMETA")682 keyboard_device.tap("s")683 keyboard_device.release("KEY_LEFTMETA")684 time.sleep(0.5)685 # find which screenshot file got created?686 for ssFilename in westonTakeScreenshotRoot.ssFilenames:687 if os.access(ssFilename, os.R_OK):688 break689 else:690 if retry > 0:691 return westonTakeScreenshotRoot(retry-1)692 else:693 return False, "weston did not create any of files %s" % (694 westonTakeScreenshotRoot.ssFilenames,)695 # wait for the screenshot writer to finish696 writerPid = fuser(ssFilename)697 if writerPid != None:698 time.sleep(0.1)699 while fuser(ssFilename, [writerPid]) != None:700 time.sleep(0.1)701 shutil.move(ssFilename, "/tmp/screenshot.png")702 os.chmod("/tmp/screenshot.png", 0666)703 except Exception, e:704 return False, str(e)705 return True, None706westonTakeScreenshotRoot.ssFilenames = None707def takeScreenshotOnWeston():708 if iAmRoot:709 rv, status = westonTakeScreenshotRoot()710 else:711 rv, status = subAgentCommand("root", "tizen", "ss weston-root")712 if rv == False:713 return rv, status714 return True, file("/tmp/screenshot.png").read()715def fuser(filename, usualSuspects=None):716 """Returns the pid of a user of given file, or None"""717 filepath = os.path.realpath(filename)718 if not os.access(filepath, os.R_OK):719 raise ValueError('No such file: "%s"' % (filename,))720 if usualSuspects == None:721 procFds = glob.glob("/proc/[1-9][0-9][0-9]*/fd/*")722 else:723 procFds = []724 for pid in usualSuspects:725 procFds.extend(glob.glob("/proc/%s/fd/*" % (pid,)))726 for symlink in procFds:727 try:728 if os.path.realpath(symlink) == filepath:729 return int(symlink.split('/')[2])730 except OSError:731 pass732def findWestonScreenshotFilenames():733 # find weston cwd734 dirs = []735 for exe in glob.glob("/proc/[1-9][0-9][0-9]*/exe"):736 try:737 if os.path.realpath(exe) == "/usr/bin/weston":738 dirs.append(os.path.realpath(os.path.dirname(exe) + "/cwd"))739 except OSError:740 pass741 return [d + "/wayland-screenshot.png" for d in sorted(set(dirs))]742if g_Xavailable:743 takeScreenshot = takeScreenshotOnX744else:745 takeScreenshot = takeScreenshotOnWeston746def shellSOE(command, asyncStatus, asyncOut, asyncError, usePty):747 if usePty:748 command = '''python -c "import pty; pty.spawn(%s)" ''' % (repr(shlex.split(command)),),749 if (asyncStatus, asyncOut, asyncError) != (None, None, None):750 # prepare for decoupled asynchronous execution751 if asyncStatus == None: asyncStatus = "/dev/null"752 if asyncOut == None: asyncOut = "/dev/null"753 if asyncError == None: asyncError = "/dev/null"754 try:755 stdinFile = file("/dev/null", "r")756 stdoutFile = file(asyncOut, "a+")757 stderrFile = file(asyncError, "a+", 0)758 statusFile = file(asyncStatus, "a+")759 except IOError, e:760 return False, (None, None, e)761 try:762 if os.fork() > 0:763 # parent returns after successful fork, there no764 # direct visibility to async child process beyond this765 # point.766 stdinFile.close()767 stdoutFile.close()768 stderrFile.close()769 statusFile.close()770 return True, (0, None, None) # async parent finishes here771 except OSError, e:772 return False, (None, None, e)773 os.setsid()774 else:775 stdinFile = subprocess.PIPE776 stdoutFile = subprocess.PIPE777 stderrFile = subprocess.PIPE778 try:779 p = subprocess.Popen(command, shell=True,780 stdin=stdinFile,781 stdout=stdoutFile,782 stderr=stderrFile,783 close_fds=True)784 except Exception, e:785 return False, (None, None, e)786 if asyncStatus == None and asyncOut == None and asyncError == None:787 # synchronous execution, read stdout and stderr788 out, err = p.communicate()789 else:790 # asynchronous execution, store status to file791 statusFile.write(str(p.wait()) + "\n")792 statusFile.close()793 out, err = None, None794 sys.exit(0) # async child finishes here795 return True, (p.returncode, out, err)796def waitOutput(nonblockingFd, acceptedOutputs, timeout, pollInterval=0.1):797 start = time.time()798 endTime = start + timeout799 s = ""800 try: s += nonblockingFd.read()801 except IOError: pass802 foundOutputs = [ao for ao in acceptedOutputs if ao in s]803 while len(foundOutputs) == 0 and time.time() < endTime:804 time.sleep(pollInterval)805 try: s += nonblockingFd.read()806 except IOError: pass807 foundOutputs = [ao for ao in acceptedOutputs if ao in s]808 return foundOutputs, s809_subAgents = {}810def openSubAgent(username, password):811 p = subprocess.Popen('''python -c 'import pty; pty.spawn(["su", "-c", "python /tmp/fmbttizen-agent.py --sub-agent", "-", "%s"])' ''' % (username,),812 shell=True,813 stdin=subprocess.PIPE,814 stdout=subprocess.PIPE,815 stderr=subprocess.PIPE)816 # Read in non-blocking mode to ensure agent starts correctly817 fl = fcntl.fcntl(p.stdout.fileno(), fcntl.F_GETFL)818 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)819 output2 = ""820 seenPrompts, output1 = waitOutput(p.stdout, ["Password:", "FMBTAGENT"], 5.0)821 if "Password:" in seenPrompts:822 p.stdin.write(password + "\r")823 output1 = ""824 seenPrompts, output2 = waitOutput(p.stdout, ["FMBTAGENT"], 5.0)825 if not "FMBTAGENT" in seenPrompts:826 p.terminate()827 return (None, 'fMBT agent with username "%s" does not answer.' % (username,),828 output1 + output2)829 # Agent is alive, continue in blocking mode830 fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, fl)831 return p, "", ""832def subAgentCommand(username, password, cmd):833 if not username in _subAgents:834 process, output, error = openSubAgent(username, password)835 if process == None:836 return None, (-1, output, error)837 else:838 _subAgents[username] = process839 p = _subAgents[username]840 p.stdin.write(cmd + "\r")841 p.stdin.flush()842 answer = p.stdout.readline().rstrip()843 if answer.startswith("FMBTAGENT OK "):844 return True, _decode(answer[len("FMBTAGENT OK "):])845 else:846 return False, _decode(answer[len("FMBTAGENT ERROR "):])847def closeSubAgents():848 for username in _subAgents:849 subAgentCommand(username, None, "quit")850if __name__ == "__main__":851 try:852 origTermAttrs = termios.tcgetattr(sys.stdin.fileno())853 hasTerminal = True854 except termios.error:855 origTermAttrs = None856 hasTerminal = False857 if hasTerminal and not "--keep-echo" in sys.argv and not "--debug" in sys.argv:858 # Disable terminal echo859 newTermAttrs = origTermAttrs860 newTermAttrs[3] = origTermAttrs[3] & ~termios.ECHO861 termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, newTermAttrs)862 if "--no-x" in sys.argv:863 debug("X disabled")864 g_Xavailable = False865 platformInfo = {}866 platformInfo["input devices"] = fmbtuinput._g_deviceNames.keys()867 # Send version number, enter main loop868 write_response(True, platformInfo)869 cmd = read_cmd()870 while cmd:871 if cmd.startswith("bl "): # set display backlight time872 if iAmRoot:873 timeout = int(cmd[3:].strip())874 try:875 file("/opt/var/kdb/db/setting/lcd_backlight_normal","wb").write(struct.pack("ii",0x29,timeout))876 write_response(True, None)877 except Exception, e: write_response(False, e)878 else:879 write_response(*subAgentCommand("root", "tizen", cmd))880 elif cmd.startswith("er "): # event recorder881 if iAmRoot:882 cmd, arg = cmd.split(" ", 1)883 if arg.startswith("start "):884 filterOpts = _decode(arg.split()[1])885 if touch_device:886 filterOpts["touchScreen"] = touch_device887 fmbtuinput.startQueueingEvents(filterOpts)888 write_response(True, None)889 elif arg == "stop":890 events = fmbtuinput.stopQueueingEvents()891 write_response(True, None)892 elif arg == "fetch":893 events = fmbtuinput.fetchQueuedEvents()894 write_response(True, events)895 else:896 write_response(*subAgentCommand("root", "tizen", cmd))897 elif cmd.startswith("gd"): # get display status898 try:899 p = subprocess.Popen(['/usr/bin/xset', 'q'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)900 out, err = p.communicate()901 if "Monitor is Off" in out: write_response(True, "Off")902 elif "Monitor is On" in out: write_response(True, "On")903 else: write_response(False, err)904 except Exception, e: write_response(False, e)905 elif cmd.startswith("tm "): # touch move(x, y)906 xs, ys = cmd[3:].strip().split()907 if g_Xavailable:908 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)909 libX11.XFlush(display)910 else:911 if iAmRoot: rv, msg = sendHwMove(int(xs), int(ys))912 else: rv, msg = subAgentCommand("root", "tizen", cmd)913 write_response(True, None)914 elif cmd.startswith("tr "): # relative move(x, y)915 xd, yd = cmd[3:].strip().split()916 if iAmRoot: rv, msg = sendRelMove(int(xd), int(yd))917 else: rv, msg = subAgentCommand("root", "tizen", cmd)918 write_response(True, None)919 elif cmd.startswith("tt "): # touch tap(x, y, button)920 x, y, button = [int(i) for i in cmd[3:].strip().split()]921 if g_Xavailable:922 libXtst.XTestFakeMotionEvent(display, current_screen, x, y, X_CurrentTime)923 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)924 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)925 libX11.XFlush(display)926 rv, msg = True, None927 else:928 if iAmRoot: rv, msg = sendHwTap(x, y, button-1)929 else: rv, msg = subAgentCommand("root", "tizen", cmd)930 write_response(rv, msg)931 elif cmd.startswith("td "): # touch down(x, y, button)932 xs, ys, button = cmd[3:].strip().split()933 button = int(button)934 if g_Xavailable:935 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)936 libXtst.XTestFakeButtonEvent(display, button, X_True, X_CurrentTime)937 libX11.XFlush(display)938 else:939 if iAmRoot: rv, msg = sendHwFingerDown(int(xs), int(ys), button-1)940 else: rv, msg = subAgentCommand("root", "tizen", cmd)941 write_response(True, None)942 elif cmd.startswith("tu "): # touch up(x, y, button)943 xs, ys, button = cmd[3:].strip().split()944 button = int(button)945 if g_Xavailable:946 libXtst.XTestFakeMotionEvent(display, current_screen, int(xs), int(ys), X_CurrentTime)947 libXtst.XTestFakeButtonEvent(display, button, X_False, X_CurrentTime)948 libX11.XFlush(display)949 else:950 if iAmRoot: rv, msg = sendHwFingerUp(int(xs), int(ys), button-1)951 else: rv, msg = subAgentCommand("root", "tizen", cmd)952 write_response(True, None)953 elif cmd.startswith("kd "): # hw key down954 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, -1)955 else: rv, msg = subAgentCommand("root", "tizen", cmd)956 write_response(rv, msg)957 elif cmd.startswith("kn"): # list input key names958 if "hwKeyDevice" in globals():959 hk = hwKeyDevice.keys()960 else:961 hk = []962 if "keyboard_device" in globals():963 ik = InputKeys964 else:965 ik = []966 write_response(True, sorted(ik + hk))967 elif cmd.startswith("kp "): # hw key press968 if iAmRoot: rv, msg = sendHwKey(cmd[3:], 0, 0)969 else: rv, msg = subAgentCommand("root", "tizen", cmd)970 write_response(rv, msg)971 elif cmd.startswith("ku "): # hw key up972 if iAmRoot: rv, msg = sendHwKey(cmd[3:], -1, 0)973 else: rv, msg = subAgentCommand("root", "tizen", cmd)974 write_response(rv, msg)975 elif cmd.startswith("kt "): # send x events976 if g_Xavailable:977 rv, skippedSymbols = typeSequence(_decode(cmd[3:]))978 libX11.XFlush(display)979 elif iAmRoot:980 rv, skippedSymbols = typeSequence(_decode(cmd[3:]),981 delayBetweenChars=0.05)982 else:983 rv, skippedSymbols = subAgentCommand("root", "tizen", cmd)984 write_response(rv, skippedSymbols)985 elif cmd.startswith("ml "): # send multitouch linear gesture986 if iAmRoot:987 rv, _ = mtLinearGesture(*_decode(cmd[3:]))988 else:989 rv, _ = subAgentCommand("root", "tizen", cmd)990 write_response(rv, _)991 elif cmd.startswith("ss"): # save screenshot992 if "R" in cmd.split() and g_Xavailable:993 resetXConnection()994 if "weston-root" in cmd.split(): # do Weston root part only995 write_response(*westonTakeScreenshotRoot())996 else:997 rv, compressedImage = takeScreenshot()998 write_response(rv, compressedImage)999 elif cmd.startswith("sd "): # set screen dimensions (width and height)1000 _sw, _sh = cmd[3:].split()1001 screenWidth, screenHeight = int(_sw), int(_sh)1002 if iAmRoot:1003 if touch_device:1004 touch_device.setScreenSize((screenWidth, screenHeight))1005 rv, msg = True, None1006 else:1007 rv, msg = True, "no touch device"1008 else:1009 rv, msg = subAgentCommand("root", "tizen", cmd)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful