Best Python code snippet using ATX
mixins.py
Source:mixins.py  
...22__dir__ = os.path.dirname(os.path.abspath(__file__))23class RotationWatcherMixin(object):24    __rotation = 025    __watcher_process = None26    def open_rotation_watcher(self, on_rotation_change=None):27        package_name = 'jp.co.cyberagent.stf.rotationwatcher'28        out = self.raw_cmd('shell', 'pm', 'list', 'packages', stdout=subprocess.PIPE).communicate()[0]29        if package_name not in out:30            apkpath = os.path.join(__dir__, '..', 'vendor', 'RotationWatcher.apk')31            # print 'install rotationwatcher...', apkpath32            if 0 != self.raw_cmd('install', '-r', '-t', apkpath).wait():33                # print 'install rotationwatcher failed.'34                return35        if self.__watcher_process is not None:36            self.__watcher_process.kill()37        out = self.raw_cmd('shell', 'pm', 'path', package_name, stdout=subprocess.PIPE).communicate()[0]38        path = out.strip().split(':')[-1]39        p = self.raw_cmd('shell',40                         'CLASSPATH="%s"' % path,41                         'app_process',42                         '/system/bin',43                         'jp.co.cyberagent.stf.rotationwatcher.RotationWatcher',44                         stdout=subprocess.PIPE)45        self.__watcher_process = p46        queue = Queue.Queue()47        def _pull():48            while True:49                line = p.stdout.readline().strip()50                if not line:51                    if p.poll() is not None:52                        # print 'rotationwatcher stopped'53                        break54                    continue55                queue.put(line)56        t = threading.Thread(target=_pull)57        t.setDaemon(True)58        t.start()59        def listener(value):60            try:61                self.__rotation = int(value) / 9062            except:63                return64            if callable(on_rotation_change):65                on_rotation_change(self.__rotation)66        def _listen():67            while True:68                try:69                    time.sleep(0.005)70                    line = queue.get_nowait()71                    listener(line)72                except Queue.Empty:73                    if p.poll() is not None:74                        break75                    continue76                except:77                    traceback.print_exc()78        t = threading.Thread(target=_listen)79        t.setDaemon(True)80        t.start()81def str2img(jpgstr, orientation=None):82    import numpy as np83    import cv284    arr = np.fromstring(jpgstr, np.uint8)85    img = cv2.imdecode(arr, cv2.IMREAD_COLOR)86    if orientation == 1:87        return cv2.flip(cv2.transpose(img), 0)  # counter-clockwise88    if orientation == 3:89        return cv2.flip(cv2.transpose(img), 1)  # clockwise90    return img91class MinicapStreamMixin(object):92    __screen = None93    __minicap_process = None94    def __install_minicap(self):95        # install minicap & minitouch96        os.system('python -m atx minicap')97    def open_minicap_stream(self, port=1313):98        # ensure minicap installed99        out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minicap"', stdout=subprocess.PIPE).communicate()[0]100        if 'No such file or directory' in out:101            self.__install_minicap()102        if self.__minicap_process is not None:103            self.__minicap_process.kill()104        # if minicap is already started, kill it first.105        out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minicap', stdout=subprocess.PIPE).communicate()[0]106        out = out.strip().split('\n')107        if len(out) > 1:108            idx = out[0].split().index('PID')109            pid = out[1].split()[idx]110            # print 'minicap is running, killing', pid111            self.raw_cmd('shell', 'kill', '-9', pid).wait()112        # start minicap113        out = self.raw_cmd('shell', 'LD_LIBRARY_PATH=/data/local/tmp', '/data/local/tmp/minicap', '-i',114                           stdout=subprocess.PIPE).communicate()[0]115        m = re.search('"width": (\d+).*"height": (\d+).*"rotation": (\d+)', out, re.S)116        w, h, r = map(int, m.groups())117        w, h = min(w, h), max(w, h)118        params = '{x}x{y}@{x}x{y}/{r}'.format(x=w, y=h, r=r)119        # print 'starting minicap', params120        p = self.raw_cmd('shell',121                         'LD_LIBRARY_PATH=/data/local/tmp',122                         '/data/local/tmp/minicap',123                         '-P %s' % params,124                         '-S',125                         stdout=subprocess.PIPE)126        self.__minicap_process = p127        time.sleep(0.5)128        # forward to tcp port 129        self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minicap').wait()130        queue = Queue.Queue()131        # pull data from socket132        def _pull():133            # print 'start pull', p.pid, p.poll()134            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)135            try:136                assert p.poll() is None137                s.connect(('127.0.0.1', port))138                t = s.recv(24)139                # print 'minicap connected', struct.unpack('<2B5I2B', t)140                while True:141                    frame_size = struct.unpack("<I", s.recv(4))[0]142                    trunks = []143                    recvd_size = 0144                    while recvd_size < frame_size:145                        trunk_size = min(8192, frame_size - recvd_size)146                        d = s.recv(trunk_size)147                        trunks.append(d)148                        recvd_size += len(d)149                    queue.put(''.join(trunks))150            except Exception as e:151                if not isinstance(e, struct.error):152                    traceback.print_exc()153                if p.poll() is not None:154                    pass155                # print 'Process died.'156                # print p.stdout.read()157                else:158                    # print 'stoping minicap ...'159                    p.kill()160            finally:161                s.close()162                self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()163        t = threading.Thread(target=_pull)164        t.setDaemon(True)165        t.start()166        out = self.raw_cmd('shell', 'getprop', 'ro.build.version.sdk', stdout=subprocess.PIPE).communicate()[0]167        sdk = int(out.strip())168        orientation = r / 90169        def _listen():170            while True:171                try:172                    time.sleep(0.005)173                    frame = queue.get_nowait()174                    if sdk <= 16:175                        img = str2img(frame, orientation)176                    else:177                        img = str2img(frame)178                    self.__screen = img179                except Queue.Empty:180                    if p.poll() is not None:181                        # print 'minicap died'182                        # print p.stdout.read()183                        break184                    continue185                except:186                    traceback.print_exc()187        t = threading.Thread(target=_listen)188        t.setDaemon(True)189        t.start()190    def screenshot_cv2(self):191        return self.__screen192class MinitouchStreamMixin(object):193    __touch_queue = None194    __minitouch_process = None195    def __install_minitouch(self):196        # install minicap & minitouch197        os.system('python -m atx minicap')198    def open_minitouch_stream(self, port=1111):199        if self.__touch_queue is None:200            self.__touch_queue = Queue.Queue()201        # ensure minicap installed202        out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minitouch"', stdout=subprocess.PIPE).communicate()[0]203        if 'No such file or directory' in out:204            self.__install_minitouch()205        if self.__minitouch_process is not None:206            self.__minitouch_process.kill()207        out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minitouch', stdout=subprocess.PIPE).communicate()[0]208        out = out.strip().split('\n')209        if len(out) > 1:210            p = None211        else:212            p = self.raw_cmd('shell', '/data/local/tmp/minitouch')213            time.sleep(1)214            if p.poll() is not None:215                # print 'start minitouch failed.'216                return217        self.__minitouch_process = p218        self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minitouch').wait()219        def send():220            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)221            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)222            try:223                s.connect(('127.0.0.1', port))224                while True:225                    cmd = self.__touch_queue.get()  # wait here226                    if not cmd:227                        continue228                    elif cmd[-1] != '\n':229                        cmd += '\n'230                    s.send(cmd)231            except:232                traceback.print_exc()233            finally:234                s.close()235                self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()236        t = threading.Thread(target=send)237        t.setDaemon(True)238        t.start()239    def click(self, x, y):240        cmd = 'd 0 %d %d 30\nc\nu 0\nc\n' % (int(x), int(y))241        self.__touch_queue.put(cmd)242    def swipe(self, sx, sy, ex, ey, steps=20):243        x1, y1, x2, y2 = map(int, (sx, sy, ex, ey))244        dx = (x2 - x1) / steps245        dy = (y2 - y1) / steps246        send = self.touchqueue.put247        send('d 0 %d %d 30\nc\n' % (x1, y1))248        for i in range(steps - 1):249            x, y = x1 + (i + 1) * dx, y1 + (i + 1) * dy250            send('m 0 %d %d 30\nc\n' % (x, y))251        send('u 0 %d %d 30\nc\nu 0\nc\n' % (x2, y2))252    def pinchin(self, x1, y1, x2, y2, steps=10):253        pass254    def pinchout(self, x1, y1, x2, y2, steps=10):255        pass256class OpenSTFServiceMixin(object):257    pass258# -------------- examples ----------------#259class DummyDevice(object):260    def raw_cmd(self, *args, **kwargs):261        cmds = ['adb'] + list(args)262        # print cmds263        return subprocess.Popen(cmds, **kwargs)264# Mixins should come in front to override functions in Base265class TestDevice(MinitouchStreamMixin, MinicapStreamMixin, RotationWatcherMixin, DummyDevice):266    def __init__(self, *args, **kwargs):267        super(self.__class__, self).__init__(*args, **kwargs)268        self.open_rotation_watcher(on_rotation_change=lambda v: self.open_minicap_stream())269        self.open_minitouch_stream()270if __name__ == '__main__':271    import cv2272    dev = TestDevice()273    while True:274        img = dev.screenshot_cv2()275        if img is not None:276            cv2.imshow('screen', img)...ScreenMinicap.py
Source:ScreenMinicap.py  
...11class ScreenMinicap(MinicapStream, RotationWatcher):12    def __init__(self, device, isShow=False):13        super(self.__class__, self).__init__(device, isShow)14    def Initialize(self, capHeight = 720, capWidth = 1280):15        self.open_rotation_watcher(on_rotation_change=lambda v: self.OpenMinicapStream(capHeight, capWidth))16        # self.OpenMinicapStream(capHeight, capWidth)17        return True18if __name__ == '__main__':19    import logging.config20    LOG_CFG_FILE = '../cfg/log.ini'21    logging.config.fileConfig(LOG_CFG_FILE)22    screen = ScreenMinicap(None)23    screen.Initialize()24    frameCountTimestamp = time.time()25    while True:26        now = time.time()27        difftime = now - frameCountTimestamp28        img = screen.GetScreen()29        if img is not None:...TouchMinitouch.py
Source:TouchMinitouch.py  
...13class TouchMinitouch(MinitouchStream, RotationWatcher):14    def __init__(self, device):15        super(self.__class__, self).__init__(device)16        self.OpenMinitouchStream()17        # self.open_rotation_watcher(on_rotation_change=lambda v: self.ChangeRotation(v))18import time19from adbkit.ADBClient import ADBClient20if __name__ == '__main__':21    adb = ADBClient()22    dev = adb.device()23    testTouch = TouchMinitouch(dev)24    count = 10025    while count > 0:26        testTouch.Click(200, 200)27        # testTouch.Move(500, 0)28        # testTouch.Up()29        time.sleep(1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
