How to use open_rotation_watcher method in ATX

Best Python code snippet using ATX

mixins.py

Source:mixins.py Github

copy

Full Screen

...22__dir__ = os.path.dirname(os.path.abspath(__file__))23class RotationWatcherMixin(object):24 __rotation = 025 __watcher_process = None26 def open_rotation_watcher(self, on_rotation_change=None):27 package_name = 'jp.co.cyberagent.stf.rotationwatcher'28 out = self.raw_cmd('shell', 'pm', 'list', 'packages', stdout=subprocess.PIPE).communicate()[0]29 if package_name not in out:30 apkpath = os.path.join(__dir__, '..', 'vendor', 'RotationWatcher.apk')31 # print 'install rotationwatcher...', apkpath32 if 0 != self.raw_cmd('install', '-r', '-t', apkpath).wait():33 # print 'install rotationwatcher failed.'34 return35 if self.__watcher_process is not None:36 self.__watcher_process.kill()37 out = self.raw_cmd('shell', 'pm', 'path', package_name, stdout=subprocess.PIPE).communicate()[0]38 path = out.strip().split(':')[-1]39 p = self.raw_cmd('shell',40 'CLASSPATH="%s"' % path,41 'app_process',42 '/system/bin',43 'jp.co.cyberagent.stf.rotationwatcher.RotationWatcher',44 stdout=subprocess.PIPE)45 self.__watcher_process = p46 queue = Queue.Queue()47 def _pull():48 while True:49 line = p.stdout.readline().strip()50 if not line:51 if p.poll() is not None:52 # print 'rotationwatcher stopped'53 break54 continue55 queue.put(line)56 t = threading.Thread(target=_pull)57 t.setDaemon(True)58 t.start()59 def listener(value):60 try:61 self.__rotation = int(value) / 9062 except:63 return64 if callable(on_rotation_change):65 on_rotation_change(self.__rotation)66 def _listen():67 while True:68 try:69 time.sleep(0.005)70 line = queue.get_nowait()71 listener(line)72 except Queue.Empty:73 if p.poll() is not None:74 break75 continue76 except:77 traceback.print_exc()78 t = threading.Thread(target=_listen)79 t.setDaemon(True)80 t.start()81def str2img(jpgstr, orientation=None):82 import numpy as np83 import cv284 arr = np.fromstring(jpgstr, np.uint8)85 img = cv2.imdecode(arr, cv2.IMREAD_COLOR)86 if orientation == 1:87 return cv2.flip(cv2.transpose(img), 0) # counter-clockwise88 if orientation == 3:89 return cv2.flip(cv2.transpose(img), 1) # clockwise90 return img91class MinicapStreamMixin(object):92 __screen = None93 __minicap_process = None94 def __install_minicap(self):95 # install minicap & minitouch96 os.system('python -m atx minicap')97 def open_minicap_stream(self, port=1313):98 # ensure minicap installed99 out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minicap"', stdout=subprocess.PIPE).communicate()[0]100 if 'No such file or directory' in out:101 self.__install_minicap()102 if self.__minicap_process is not None:103 self.__minicap_process.kill()104 # if minicap is already started, kill it first.105 out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minicap', stdout=subprocess.PIPE).communicate()[0]106 out = out.strip().split('\n')107 if len(out) > 1:108 idx = out[0].split().index('PID')109 pid = out[1].split()[idx]110 # print 'minicap is running, killing', pid111 self.raw_cmd('shell', 'kill', '-9', pid).wait()112 # start minicap113 out = self.raw_cmd('shell', 'LD_LIBRARY_PATH=/data/local/tmp', '/data/local/tmp/minicap', '-i',114 stdout=subprocess.PIPE).communicate()[0]115 m = re.search('"width": (\d+).*"height": (\d+).*"rotation": (\d+)', out, re.S)116 w, h, r = map(int, m.groups())117 w, h = min(w, h), max(w, h)118 params = '{x}x{y}@{x}x{y}/{r}'.format(x=w, y=h, r=r)119 # print 'starting minicap', params120 p = self.raw_cmd('shell',121 'LD_LIBRARY_PATH=/data/local/tmp',122 '/data/local/tmp/minicap',123 '-P %s' % params,124 '-S',125 stdout=subprocess.PIPE)126 self.__minicap_process = p127 time.sleep(0.5)128 # forward to tcp port 129 self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minicap').wait()130 queue = Queue.Queue()131 # pull data from socket132 def _pull():133 # print 'start pull', p.pid, p.poll()134 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)135 try:136 assert p.poll() is None137 s.connect(('127.0.0.1', port))138 t = s.recv(24)139 # print 'minicap connected', struct.unpack('<2B5I2B', t)140 while True:141 frame_size = struct.unpack("<I", s.recv(4))[0]142 trunks = []143 recvd_size = 0144 while recvd_size < frame_size:145 trunk_size = min(8192, frame_size - recvd_size)146 d = s.recv(trunk_size)147 trunks.append(d)148 recvd_size += len(d)149 queue.put(''.join(trunks))150 except Exception as e:151 if not isinstance(e, struct.error):152 traceback.print_exc()153 if p.poll() is not None:154 pass155 # print 'Process died.'156 # print p.stdout.read()157 else:158 # print 'stoping minicap ...'159 p.kill()160 finally:161 s.close()162 self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()163 t = threading.Thread(target=_pull)164 t.setDaemon(True)165 t.start()166 out = self.raw_cmd('shell', 'getprop', 'ro.build.version.sdk', stdout=subprocess.PIPE).communicate()[0]167 sdk = int(out.strip())168 orientation = r / 90169 def _listen():170 while True:171 try:172 time.sleep(0.005)173 frame = queue.get_nowait()174 if sdk <= 16:175 img = str2img(frame, orientation)176 else:177 img = str2img(frame)178 self.__screen = img179 except Queue.Empty:180 if p.poll() is not None:181 # print 'minicap died'182 # print p.stdout.read()183 break184 continue185 except:186 traceback.print_exc()187 t = threading.Thread(target=_listen)188 t.setDaemon(True)189 t.start()190 def screenshot_cv2(self):191 return self.__screen192class MinitouchStreamMixin(object):193 __touch_queue = None194 __minitouch_process = None195 def __install_minitouch(self):196 # install minicap & minitouch197 os.system('python -m atx minicap')198 def open_minitouch_stream(self, port=1111):199 if self.__touch_queue is None:200 self.__touch_queue = Queue.Queue()201 # ensure minicap installed202 out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minitouch"', stdout=subprocess.PIPE).communicate()[0]203 if 'No such file or directory' in out:204 self.__install_minitouch()205 if self.__minitouch_process is not None:206 self.__minitouch_process.kill()207 out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minitouch', stdout=subprocess.PIPE).communicate()[0]208 out = out.strip().split('\n')209 if len(out) > 1:210 p = None211 else:212 p = self.raw_cmd('shell', '/data/local/tmp/minitouch')213 time.sleep(1)214 if p.poll() is not None:215 # print 'start minitouch failed.'216 return217 self.__minitouch_process = p218 self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minitouch').wait()219 def send():220 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)221 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)222 try:223 s.connect(('127.0.0.1', port))224 while True:225 cmd = self.__touch_queue.get() # wait here226 if not cmd:227 continue228 elif cmd[-1] != '\n':229 cmd += '\n'230 s.send(cmd)231 except:232 traceback.print_exc()233 finally:234 s.close()235 self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()236 t = threading.Thread(target=send)237 t.setDaemon(True)238 t.start()239 def click(self, x, y):240 cmd = 'd 0 %d %d 30\nc\nu 0\nc\n' % (int(x), int(y))241 self.__touch_queue.put(cmd)242 def swipe(self, sx, sy, ex, ey, steps=20):243 x1, y1, x2, y2 = map(int, (sx, sy, ex, ey))244 dx = (x2 - x1) / steps245 dy = (y2 - y1) / steps246 send = self.touchqueue.put247 send('d 0 %d %d 30\nc\n' % (x1, y1))248 for i in range(steps - 1):249 x, y = x1 + (i + 1) * dx, y1 + (i + 1) * dy250 send('m 0 %d %d 30\nc\n' % (x, y))251 send('u 0 %d %d 30\nc\nu 0\nc\n' % (x2, y2))252 def pinchin(self, x1, y1, x2, y2, steps=10):253 pass254 def pinchout(self, x1, y1, x2, y2, steps=10):255 pass256class OpenSTFServiceMixin(object):257 pass258# -------------- examples ----------------#259class DummyDevice(object):260 def raw_cmd(self, *args, **kwargs):261 cmds = ['adb'] + list(args)262 # print cmds263 return subprocess.Popen(cmds, **kwargs)264# Mixins should come in front to override functions in Base265class TestDevice(MinitouchStreamMixin, MinicapStreamMixin, RotationWatcherMixin, DummyDevice):266 def __init__(self, *args, **kwargs):267 super(self.__class__, self).__init__(*args, **kwargs)268 self.open_rotation_watcher(on_rotation_change=lambda v: self.open_minicap_stream())269 self.open_minitouch_stream()270if __name__ == '__main__':271 import cv2272 dev = TestDevice()273 while True:274 img = dev.screenshot_cv2()275 if img is not None:276 cv2.imshow('screen', img)...

Full Screen

Full Screen

ScreenMinicap.py

Source:ScreenMinicap.py Github

copy

Full Screen

...11class ScreenMinicap(MinicapStream, RotationWatcher):12 def __init__(self, device, isShow=False):13 super(self.__class__, self).__init__(device, isShow)14 def Initialize(self, capHeight = 720, capWidth = 1280):15 self.open_rotation_watcher(on_rotation_change=lambda v: self.OpenMinicapStream(capHeight, capWidth))16 # self.OpenMinicapStream(capHeight, capWidth)17 return True18if __name__ == '__main__':19 import logging.config20 LOG_CFG_FILE = '../cfg/log.ini'21 logging.config.fileConfig(LOG_CFG_FILE)22 screen = ScreenMinicap(None)23 screen.Initialize()24 frameCountTimestamp = time.time()25 while True:26 now = time.time()27 difftime = now - frameCountTimestamp28 img = screen.GetScreen()29 if img is not None:...

Full Screen

Full Screen

TouchMinitouch.py

Source:TouchMinitouch.py Github

copy

Full Screen

...13class TouchMinitouch(MinitouchStream, RotationWatcher):14 def __init__(self, device):15 super(self.__class__, self).__init__(device)16 self.OpenMinitouchStream()17 # self.open_rotation_watcher(on_rotation_change=lambda v: self.ChangeRotation(v))18import time19from adbkit.ADBClient import ADBClient20if __name__ == '__main__':21 adb = ADBClient()22 dev = adb.device()23 testTouch = TouchMinitouch(dev)24 count = 10025 while count > 0:26 testTouch.Click(200, 200)27 # testTouch.Move(500, 0)28 # testTouch.Up()29 time.sleep(1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful