How to use open_minicap_stream method in ATX

Best Python code snippet using ATX

MinicapMin.py

Source:MinicapMin.py Github

copy

Full Screen

...44 self.raw_cmd('shell', 'chmod', '777', '/data/local/tmp/minicap',45 stdout=subprocess.PIPE).communicate()[0]46 self.raw_cmd('push', os.getcwd() + '/lib/armeabi-v7a/minitouch', '/data/local/tmp')47 self.raw_cmd('shell', 'chmod', '755', '/data/local/tmp/minitouch')48 def open_minicap_stream(self, port=1313, serial=""):49 # ensure minicap installed50 # out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minicap"',51 # stdout=subprocess.PIPE).communicate()[0]52 # if 'No such file or directory' in out or len(out) < 2:53 # self.__install_minicap()54 if self.__minicap_process is not None:55 self.__minicap_process.kill()56 # if minicap is already started, kill it first.57 out = self.raw_cmd('shell', 'ps', '|grep', 'minicap', stdout=subprocess.PIPE).communicate()[0]58 out = out.strip().split('\n')59 if len(out[0]) > 11:60 idx = out[0].split()[1]61 # pid = out[1].split()[idx]62 print 'minicap is running, killing', idx63 self.raw_cmd('shell', 'kill', '-9', idx).wait()64 # start minicap65 out = \66 self.raw_cmd('shell', 'LD_LIBRARY_PATH=/data/local/tmp', '/data/local/tmp/minicap', '-i',67 stdout=subprocess.PIPE).communicate()[0]68 m = re.search('"width": (\d+).*"height": (\d+).*"rotation": (\d+)', out, re.S)69 w, h, r = map(int, m.groups())70 w, h = min(w, h), max(w, h)71 params = '{x}x{y}@{x1}x{y1}/{r}'.format(x=w, y=h, x1=w, y1=h, r=0)72 cmd = 'shell LD_LIBRARY_PATH=/data/local/tmp /data/local/tmp/minicap -P %s' % params + ' -S -Q 80'73 # cmds = ['adb'] + ['-s'] + [self.serial] + cmd74 p = subprocess.Popen(['adb', '-s', [self.serial], [cmd]], stdout=None)75 # p = self.raw_cmd(cmd,stdout = None)76 self.__minicap_process = p77 time.sleep(0.5)78 # forward to tcp port79 self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minicap').wait()80 self.raw_cmd('forward','tcp:1111','localabstract:minitouch').wait()81 self.raw_cmd('shell','/data/local/tmp/minitouch').wait()82 queue = Queue.Queue()83 # pull data from socket84 def _pull():85 # print 'start pull', p.pid, p.poll()86 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)87 try:88 assert p.poll() is None89 s.connect(('127.0.0.1', port))90 t = s.recv(24)91 print 'minicap connected', struct.unpack('<2B5I2B', t)92 while True:93 frame_size = struct.unpack("<I", s.recv(4))[0]94 trunks = []95 recvd_size = 096 while recvd_size < frame_size:97 trunk_size = min(8192, frame_size - recvd_size)98 d = s.recv(trunk_size)99 trunks.append(d)100 recvd_size += len(d)101 queue.put(''.join(trunks))102 except Exception as e:103 if not isinstance(e, struct.error):104 pass105 # traceback.print_exc()106 if p.poll() is not None:107 try:108 print p.stdout.read()109 except Exception, e:110 pass111 else:112 print 'stoping minicap ...'113 p.kill()114 finally:115 s.close()116 self.raw_cmd( 'forward', '--remove', 'tcp:%s' % port).wait()117 t = threading.Thread(target=_pull)118 t.setDaemon(True)119 t.start()120 out = self.raw_cmd( 'shell', 'getprop', 'ro.build.version.sdk',121 stdout=subprocess.PIPE).communicate()[0]122 sdk = int(out.strip())123 orientation = r / 90124 def _listen():125 while True:126 try:127 time.sleep(0.01)128 frame = queue.get_nowait()129 if sdk <= 16:130 img = str2img(frame, orientation)131 else:132 img = str2img(frame)133 self.__screen = img134 except Queue.Empty:135 if p.poll() is not None:136 # print 'minicap died'137 try:138 p.stdout.read()139 break140 except:141 break142 continue143 except:144 # traceback.print_exc()145 pass146 t = threading.Thread(target=_listen)147 t.setDaemon(True)148 t.start()149 def screenshot_cv2(self):150 return self.__screen151class DummyDevice(object):152 def raw_cmd(self, *args, **kwargs):153 cmds = ['adb'] + ['-s'] + [self.serial] + list(args)154 return subprocess.Popen(cmds, **kwargs)155class TestDevice(MinicapStreamMixin, DummyDevice):156 def __init__(self, *args, **kwargs):157 super(self.__class__, self).__init__(*args, **kwargs)158 self.serial = kwargs.get("serial")159 self.open_minicap_stream()160 self.shootflag = True161 # self.clearRecent()162 def mathc_img(self, sourceimage, targetimage, value=0.9):163 img_rgb = cv2.imread(sourceimage)164 img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY)165 template = cv2.imread(targetimage, 0)166 w, h = template.shape[::-1]167 res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED)168 threshold = value169 xy = []170 loc = np.where(res >= threshold)171 pts = zip(*loc[::-1])172 # print len(pts)173 for pt in pts:174 cv2.rectangle(img_rgb, pt, (pt[0] + w, pt[1] + h), (7, 249, 151), 2)175 # print pt[0], pt[1], w, h176 xy.append([pt[0] + w / 2, pt[1] + h / 2])177 return xy178 def findpng(self, cp, appname):179 start = "null"180 for li in os.listdir(os.getcwd() + "/pic"):181 xy = self.mathc_img(os.getcwd() + "/pic/" + li, os.getcwd() + "/cpm/" + str(cp), 0.95)182 if len(xy) > 0:183 st = os.path.splitext(li)[0].split("_")184 stm = 0185 if str(st[1]) > 0:186 start = str(st[0]) + "." + str(st[1])187 # print "<" + appname + ">启动时:", str(start) + "秒,[" + "位置]", xy[0]188 # self.removeFileInFirstDir(os.getcwd() + "/pic")189 # print li190 break191 return start192 def tap(self, xy):193 if "x" in xy:194 x = xy.split("x")[0]195 y = xy.split("x")[1]196 elif "X" in xy:197 x = xy.split("X")[0]198 y = xy.split("Y")[1]199 elif "," in xy:200 x = xy.split(",")[0]201 y = xy.split(",")[1]202 cmd = 'shell input tap ' + x + " " + y + " "203 p = subprocess.Popen(['adb', '-s', [self.serial], [cmd]], stdout=subprocess.PIPE).communicate()[0]204 # out = self.raw_cmd('wait-for-device', 'shell', 'input', 'tap',x,y,stdout=subprocess.PIPE).communicate()[0]205 def getLauncher(self, xy):206 self.removeFileInFirstDir(os.getcwd() + "/pic")207 self.tap(xy)208 stflg = True209 try:210 while self.shootflag:211 img = self.screenshot_cv2()212 if img is not None:213 if stflg:214 starttime = datetime.now()215 stflg = False216 endtime = datetime.now()217 dmn = (endtime - starttime).seconds218 dms = (endtime - starttime).microseconds219 dms = '%06d' % dms220 imgpath = os.getcwd() + "/pic/" + str(dmn) + "_" + str(dms) + ".png"221 cv2.imwrite(imgpath, img)222 time.sleep(0.01)223 cv2.waitKey(20)224 except Exception, e:225 traceback.print_exc()226 pass227 def removeFileInFirstDir(self, targetDir):228 for file in os.listdir(targetDir):229 targetFile = os.path.join(targetDir, file)230 if os.path.isfile(targetFile):231 os.remove(targetFile)232 def killMinicap(self):233 out = self.raw_cmd('shell', 'ps', '|grep', 'minicap', stdout=subprocess.PIPE).communicate()[234 0]235 out = out.strip().split('\n')236 if len(out[0]) > 11:237 idx = out[0].split()[1]238 # pid = out[1].split()[idx]239 # print 'minicap is running, killing', idx240 self.raw_cmd( 'shell', 'kill', '-9', idx).wait()241 time.sleep(2)242 def testAppStartTime(self, cuttime, xy):243 try:244 #print "time,", cuttime245 t = threading.Thread(target=self.getLauncher, args=(xy,))246 t.setDaemon(True)247 t.start()248 time.sleep(int(cuttime))249 self.shootflag = False250 time.sleep(1)251 except Exception, e:252 print e.message253 finally:254 self.killMinicap()255class screen_with_controls(MinicapStreamMixin, DummyDevice):256 def __init__(self, *args, **kwargs):257 super(self.__class__, self).__init__(*args, **kwargs)258 self.serial = kwargs.get("serial")259 self.root = kwargs.get("tk")260 self.cav = kwargs.get("cav")261 self.image = None262 self.image_copy = None263 self.img = None264 self.tkimage = None265 self.canvas_image = None266 #self.killMinicap()267 self.open_minicap_stream()268 def killMinicap(self):269 out = \270 self.raw_cmd( 'shell', 'ps', '|grep', 'minicap', stdout=subprocess.PIPE).communicate()[271 0]272 out = out.strip().split('\n')273 if len(out[0]) > 11:274 idx = out[0].split()[1]275 # pid = out[1].split()[idx]276 # print 'minicap is running, killing', idx277 self.raw_cmd( 'shell', 'kill', '-9', idx).wait()278 time.sleep(2)279 def screen_simple(self):280 if len(self.serial)>0:281 self.img = self.screenshot_cv2()...

Full Screen

Full Screen

mixins.py

Source:mixins.py Github

copy

Full Screen

...93 __minicap_process = None94 def __install_minicap(self):95 # install minicap & minitouch96 os.system('python -m atx minicap')97 def open_minicap_stream(self, port=1313):98 # ensure minicap installed99 out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minicap"', stdout=subprocess.PIPE).communicate()[0]100 if 'No such file or directory' in out:101 self.__install_minicap()102 if self.__minicap_process is not None:103 self.__minicap_process.kill()104 # if minicap is already started, kill it first.105 out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minicap', stdout=subprocess.PIPE).communicate()[0]106 out = out.strip().split('\n')107 if len(out) > 1:108 idx = out[0].split().index('PID')109 pid = out[1].split()[idx]110 # print 'minicap is running, killing', pid111 self.raw_cmd('shell', 'kill', '-9', pid).wait()112 # start minicap113 out = self.raw_cmd('shell', 'LD_LIBRARY_PATH=/data/local/tmp', '/data/local/tmp/minicap', '-i',114 stdout=subprocess.PIPE).communicate()[0]115 m = re.search('"width": (\d+).*"height": (\d+).*"rotation": (\d+)', out, re.S)116 w, h, r = map(int, m.groups())117 w, h = min(w, h), max(w, h)118 params = '{x}x{y}@{x}x{y}/{r}'.format(x=w, y=h, r=r)119 # print 'starting minicap', params120 p = self.raw_cmd('shell',121 'LD_LIBRARY_PATH=/data/local/tmp',122 '/data/local/tmp/minicap',123 '-P %s' % params,124 '-S',125 stdout=subprocess.PIPE)126 self.__minicap_process = p127 time.sleep(0.5)128 # forward to tcp port 129 self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minicap').wait()130 queue = Queue.Queue()131 # pull data from socket132 def _pull():133 # print 'start pull', p.pid, p.poll()134 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)135 try:136 assert p.poll() is None137 s.connect(('127.0.0.1', port))138 t = s.recv(24)139 # print 'minicap connected', struct.unpack('<2B5I2B', t)140 while True:141 frame_size = struct.unpack("<I", s.recv(4))[0]142 trunks = []143 recvd_size = 0144 while recvd_size < frame_size:145 trunk_size = min(8192, frame_size - recvd_size)146 d = s.recv(trunk_size)147 trunks.append(d)148 recvd_size += len(d)149 queue.put(''.join(trunks))150 except Exception as e:151 if not isinstance(e, struct.error):152 traceback.print_exc()153 if p.poll() is not None:154 pass155 # print 'Process died.'156 # print p.stdout.read()157 else:158 # print 'stoping minicap ...'159 p.kill()160 finally:161 s.close()162 self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()163 t = threading.Thread(target=_pull)164 t.setDaemon(True)165 t.start()166 out = self.raw_cmd('shell', 'getprop', 'ro.build.version.sdk', stdout=subprocess.PIPE).communicate()[0]167 sdk = int(out.strip())168 orientation = r / 90169 def _listen():170 while True:171 try:172 time.sleep(0.005)173 frame = queue.get_nowait()174 if sdk <= 16:175 img = str2img(frame, orientation)176 else:177 img = str2img(frame)178 self.__screen = img179 except Queue.Empty:180 if p.poll() is not None:181 # print 'minicap died'182 # print p.stdout.read()183 break184 continue185 except:186 traceback.print_exc()187 t = threading.Thread(target=_listen)188 t.setDaemon(True)189 t.start()190 def screenshot_cv2(self):191 return self.__screen192class MinitouchStreamMixin(object):193 __touch_queue = None194 __minitouch_process = None195 def __install_minitouch(self):196 # install minicap & minitouch197 os.system('python -m atx minicap')198 def open_minitouch_stream(self, port=1111):199 if self.__touch_queue is None:200 self.__touch_queue = Queue.Queue()201 # ensure minicap installed202 out = self.raw_cmd('shell', 'ls', '"/data/local/tmp/minitouch"', stdout=subprocess.PIPE).communicate()[0]203 if 'No such file or directory' in out:204 self.__install_minitouch()205 if self.__minitouch_process is not None:206 self.__minitouch_process.kill()207 out = self.raw_cmd('shell', 'ps', '-C', '/data/local/tmp/minitouch', stdout=subprocess.PIPE).communicate()[0]208 out = out.strip().split('\n')209 if len(out) > 1:210 p = None211 else:212 p = self.raw_cmd('shell', '/data/local/tmp/minitouch')213 time.sleep(1)214 if p.poll() is not None:215 # print 'start minitouch failed.'216 return217 self.__minitouch_process = p218 self.raw_cmd('forward', 'tcp:%s' % port, 'localabstract:minitouch').wait()219 def send():220 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)221 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)222 try:223 s.connect(('127.0.0.1', port))224 while True:225 cmd = self.__touch_queue.get() # wait here226 if not cmd:227 continue228 elif cmd[-1] != '\n':229 cmd += '\n'230 s.send(cmd)231 except:232 traceback.print_exc()233 finally:234 s.close()235 self.raw_cmd('forward', '--remove', 'tcp:%s' % port).wait()236 t = threading.Thread(target=send)237 t.setDaemon(True)238 t.start()239 def click(self, x, y):240 cmd = 'd 0 %d %d 30\nc\nu 0\nc\n' % (int(x), int(y))241 self.__touch_queue.put(cmd)242 def swipe(self, sx, sy, ex, ey, steps=20):243 x1, y1, x2, y2 = map(int, (sx, sy, ex, ey))244 dx = (x2 - x1) / steps245 dy = (y2 - y1) / steps246 send = self.touchqueue.put247 send('d 0 %d %d 30\nc\n' % (x1, y1))248 for i in range(steps - 1):249 x, y = x1 + (i + 1) * dx, y1 + (i + 1) * dy250 send('m 0 %d %d 30\nc\n' % (x, y))251 send('u 0 %d %d 30\nc\nu 0\nc\n' % (x2, y2))252 def pinchin(self, x1, y1, x2, y2, steps=10):253 pass254 def pinchout(self, x1, y1, x2, y2, steps=10):255 pass256class OpenSTFServiceMixin(object):257 pass258# -------------- examples ----------------#259class DummyDevice(object):260 def raw_cmd(self, *args, **kwargs):261 cmds = ['adb'] + list(args)262 # print cmds263 return subprocess.Popen(cmds, **kwargs)264# Mixins should come in front to override functions in Base265class TestDevice(MinitouchStreamMixin, MinicapStreamMixin, RotationWatcherMixin, DummyDevice):266 def __init__(self, *args, **kwargs):267 super(self.__class__, self).__init__(*args, **kwargs)268 self.open_rotation_watcher(on_rotation_change=lambda v: self.open_minicap_stream())269 self.open_minitouch_stream()270if __name__ == '__main__':271 import cv2272 dev = TestDevice()273 while True:274 img = dev.screenshot_cv2()275 if img is not None:276 cv2.imshow('screen', img)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful