How to use adb_cmd method in ATX

Best Python code snippet using ATX

test_adb.py

Source:test_adb.py Github

copy

Full Screen

1#!/usr/bin/env python22"""Simple conformance test for adb.3This script will use the available adb in path and run simple4tests that attempt to touch all accessible attached devices.5"""6import hashlib7import os8import pipes9import random10import re11import shlex12import subprocess13import sys14import tempfile15import unittest16def trace(cmd):17 """Print debug message if tracing enabled."""18 if False:19 print >> sys.stderr, cmd20def call(cmd_str):21 """Run process and return output tuple (stdout, stderr, ret code)."""22 trace(cmd_str)23 process = subprocess.Popen(shlex.split(cmd_str),24 stdout=subprocess.PIPE,25 stderr=subprocess.PIPE)26 stdout, stderr = process.communicate()27 return stdout, stderr, process.returncode28def call_combined(cmd_str):29 """Run process and return output tuple (stdout+stderr, ret code)."""30 trace(cmd_str)31 process = subprocess.Popen(shlex.split(cmd_str),32 stdout=subprocess.PIPE,33 stderr=subprocess.STDOUT)34 stdout, _ = process.communicate()35 return stdout, process.returncode36def call_checked(cmd_str):37 """Run process and get stdout+stderr, raise an exception on trouble."""38 trace(cmd_str)39 return subprocess.check_output(shlex.split(cmd_str),40 stderr=subprocess.STDOUT)41def call_checked_list(cmd_str):42 return call_checked(cmd_str).split('\n')43def call_checked_list_skip(cmd_str):44 out_list = call_checked_list(cmd_str)45 def is_init_line(line):46 if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"):47 return True48 else:49 return False50 return [line for line in out_list if not is_init_line(line)]51def get_device_list():52 output = call_checked_list_skip("adb devices")53 dev_list = []54 for line in output[1:]:55 if line.strip() == "":56 continue57 device, _ = line.split()58 dev_list.append(device)59 return dev_list60def get_attached_device_count():61 return len(get_device_list())62def compute_md5(string):63 hsh = hashlib.md5()64 hsh.update(string)65 return hsh.hexdigest()66class HostFile(object):67 def __init__(self, handle, md5):68 self.handle = handle69 self.md5 = md570 self.full_path = handle.name71 self.base_name = os.path.basename(self.full_path)72class DeviceFile(object):73 def __init__(self, md5, full_path):74 self.md5 = md575 self.full_path = full_path76 self.base_name = os.path.basename(self.full_path)77def make_random_host_files(in_dir, num_files, rand_size=True):78 files = {}79 min_size = 1 * (1 << 10)80 max_size = 16 * (1 << 10)81 fixed_size = min_size82 for _ in range(num_files):83 file_handle = tempfile.NamedTemporaryFile(dir=in_dir)84 if rand_size:85 size = random.randrange(min_size, max_size, 1024)86 else:87 size = fixed_size88 rand_str = os.urandom(size)89 file_handle.write(rand_str)90 file_handle.flush()91 md5 = compute_md5(rand_str)92 files[file_handle.name] = HostFile(file_handle, md5)93 return files94def make_random_device_files(adb, in_dir, num_files, rand_size=True):95 files = {}96 min_size = 1 * (1 << 10)97 max_size = 16 * (1 << 10)98 fixed_size = min_size99 for i in range(num_files):100 if rand_size:101 size = random.randrange(min_size, max_size, 1024)102 else:103 size = fixed_size104 base_name = "device_tmpfile" + str(i)105 full_path = in_dir + "/" + base_name106 adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path,107 size))108 dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split()109 files[full_path] = DeviceFile(dev_md5, full_path)110 return files111class AdbWrapper(object):112 """Convenience wrapper object for the adb command."""113 def __init__(self, device=None, out_dir=None):114 self.device = device115 self.out_dir = out_dir116 self.adb_cmd = "adb "117 if self.device:118 self.adb_cmd += "-s {} ".format(device)119 if self.out_dir:120 self.adb_cmd += "-p {} ".format(out_dir)121 def shell(self, cmd):122 return call_checked(self.adb_cmd + "shell " + cmd)123 def shell_nocheck(self, cmd):124 return call_combined(self.adb_cmd + "shell " + cmd)125 def install(self, filename):126 return call_checked(self.adb_cmd + "install {}".format(pipes.quote(filename)))127 def push(self, local, remote):128 return call_checked(self.adb_cmd + "push {} {}".format(local, remote))129 def pull(self, remote, local):130 return call_checked(self.adb_cmd + "pull {} {}".format(remote, local))131 def sync(self, directory=""):132 return call_checked(self.adb_cmd + "sync {}".format(directory))133 def forward(self, local, remote):134 return call_checked(self.adb_cmd + "forward {} {}".format(local,135 remote))136 def tcpip(self, port):137 return call_checked(self.adb_cmd + "tcpip {}".format(port))138 def usb(self):139 return call_checked(self.adb_cmd + "usb")140 def root(self):141 return call_checked(self.adb_cmd + "root")142 def unroot(self):143 return call_checked(self.adb_cmd + "unroot")144 def forward_remove(self, local):145 return call_checked(self.adb_cmd + "forward --remove {}".format(local))146 def forward_remove_all(self):147 return call_checked(self.adb_cmd + "forward --remove-all")148 def connect(self, host):149 return call_checked(self.adb_cmd + "connect {}".format(host))150 def disconnect(self, host):151 return call_checked(self.adb_cmd + "disconnect {}".format(host))152 def reverse(self, remote, local):153 return call_checked(self.adb_cmd + "reverse {} {}".format(remote,154 local))155 def reverse_remove_all(self):156 return call_checked(self.adb_cmd + "reverse --remove-all")157 def reverse_remove(self, remote):158 return call_checked(159 self.adb_cmd + "reverse --remove {}".format(remote))160 def wait(self):161 return call_checked(self.adb_cmd + "wait-for-device")162class AdbBasic(unittest.TestCase):163 def test_shell(self):164 """Check that we can at least cat a file."""165 adb = AdbWrapper()166 out = adb.shell("cat /proc/uptime")167 self.assertEqual(len(out.split()), 2)168 self.assertGreater(float(out.split()[0]), 0.0)169 self.assertGreater(float(out.split()[1]), 0.0)170 def test_help(self):171 """Make sure we get _something_ out of help."""172 out = call_checked("adb help")173 self.assertTrue(len(out) > 0)174 def test_version(self):175 """Get a version number out of the output of adb."""176 out = call_checked("adb version").split()177 version_num = False178 for item in out:179 if re.match(r"[\d+\.]*\d", item):180 version_num = True181 self.assertTrue(version_num)182 def _test_root(self):183 adb = AdbWrapper()184 adb.root()185 adb.wait()186 self.assertEqual("root", adb.shell("id -un").strip())187 def _test_unroot(self):188 adb = AdbWrapper()189 adb.unroot()190 adb.wait()191 self.assertEqual("shell", adb.shell("id -un").strip())192 def test_root_unroot(self):193 """Make sure that adb root and adb unroot work, using id(1)."""194 adb = AdbWrapper()195 original_user = adb.shell("id -un").strip()196 try:197 if original_user == "root":198 self._test_unroot()199 self._test_root()200 elif original_user == "shell":201 self._test_root()202 self._test_unroot()203 finally:204 if original_user == "root":205 adb.root()206 else:207 adb.unroot()208 adb.wait()209 def test_argument_escaping(self):210 """Make sure that argument escaping is somewhat sane."""211 adb = AdbWrapper()212 # http://b/19734868213 # Note that this actually matches ssh(1)'s behavior --- it's214 # converted to "sh -c echo hello; echo world" which sh interprets215 # as "sh -c echo" (with an argument to that shell of "hello"),216 # and then "echo world" back in the first shell.217 result = adb.shell("sh -c 'echo hello; echo world'").splitlines()218 self.assertEqual(["", "world"], result)219 # If you really wanted "hello" and "world", here's what you'd do:220 result = adb.shell("echo hello\;echo world").splitlines()221 self.assertEqual(["hello", "world"], result)222 # http://b/15479704223 self.assertEqual('t', adb.shell("'true && echo t'").strip())224 self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip())225 # http://b/20564385226 self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip())227 self.assertEqual('123Linux', adb.shell("echo -n 123\;uname").strip())228 def test_install_argument_escaping(self):229 """Make sure that install argument escaping works."""230 adb = AdbWrapper()231 # http://b/20323053232 tf = tempfile.NamedTemporaryFile("w", suffix="-text;ls;1.apk")233 self.assertIn("-text;ls;1.apk", adb.install(tf.name))234 # http://b/3090932235 tf = tempfile.NamedTemporaryFile("w", suffix="-Live Hold'em.apk")236 self.assertIn("-Live Hold'em.apk", adb.install(tf.name))237class AdbFile(unittest.TestCase):238 SCRATCH_DIR = "/data/local/tmp"239 DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file"240 DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir"241 def test_push(self):242 """Push a randomly generated file to specified device."""243 kbytes = 512244 adb = AdbWrapper()245 with tempfile.NamedTemporaryFile(mode="w") as tmp:246 rand_str = os.urandom(1024 * kbytes)247 tmp.write(rand_str)248 tmp.flush()249 host_md5 = compute_md5(rand_str)250 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))251 try:252 adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE)253 dev_md5, _ = adb.shell(254 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()255 self.assertEqual(host_md5, dev_md5)256 finally:257 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))258 # TODO: write push directory test.259 def test_pull(self):260 """Pull a randomly generated file from specified device."""261 kbytes = 512262 adb = AdbWrapper()263 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE))264 try:265 adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format(266 AdbFile.DEVICE_TEMP_FILE, kbytes))267 dev_md5, _ = adb.shell(268 "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split()269 with tempfile.NamedTemporaryFile(mode="w") as tmp_write:270 adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, local=tmp_write.name)271 with open(tmp_write.name) as tmp_read:272 host_contents = tmp_read.read()273 host_md5 = compute_md5(host_contents)274 self.assertEqual(dev_md5, host_md5)275 finally:276 adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE))277 def test_pull_dir(self):278 """Pull a randomly generated directory of files from the device."""279 adb = AdbWrapper()280 temp_files = {}281 host_dir = None282 try:283 # create temporary host directory284 host_dir = tempfile.mkdtemp()285 # create temporary dir on device286 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))287 adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR))288 # populate device dir with random files289 temp_files = make_random_device_files(290 adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32)291 adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir)292 for device_full_path in temp_files:293 host_path = os.path.join(294 host_dir, temp_files[device_full_path].base_name)295 with open(host_path) as host_file:296 host_md5 = compute_md5(host_file.read())297 self.assertEqual(host_md5,298 temp_files[device_full_path].md5)299 finally:300 for dev_file in temp_files.values():301 host_path = os.path.join(host_dir, dev_file.base_name)302 os.remove(host_path)303 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))304 if host_dir:305 os.removedirs(host_dir)306 def test_sync(self):307 """Sync a randomly generated directory of files to specified device."""308 try:309 adb = AdbWrapper()310 temp_files = {}311 # create temporary host directory312 base_dir = tempfile.mkdtemp()313 # create mirror device directory hierarchy within base_dir314 full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR315 os.makedirs(full_dir_path)316 # create 32 random files within the host mirror317 temp_files = make_random_host_files(in_dir=full_dir_path,318 num_files=32)319 # clean up any trash on the device320 adb = AdbWrapper(out_dir=base_dir)321 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))322 # issue the sync323 adb.sync("data")324 # confirm that every file on the device mirrors that on the host325 for host_full_path in temp_files.keys():326 device_full_path = os.path.join(327 AdbFile.DEVICE_TEMP_DIR,328 temp_files[host_full_path].base_name)329 dev_md5, _ = adb.shell(330 "md5sum {}".format(device_full_path)).split()331 self.assertEqual(temp_files[host_full_path].md5, dev_md5)332 finally:333 adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR))334 if temp_files:335 for tf in temp_files.values():336 tf.handle.close()337 if base_dir:338 os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR)339if __name__ == '__main__':340 random.seed(0)341 dev_count = get_attached_device_count()342 if dev_count:343 suite = unittest.TestLoader().loadTestsFromName(__name__)344 unittest.TextTestRunner(verbosity=3).run(suite)345 else:...

Full Screen

Full Screen

run_reliability_tests.py

Source:run_reliability_tests.py Github

copy

Full Screen

1#!/usr/bin/python2.42"""Run reliability tests using Android instrumentation.3 A test file consists of list web sites to test is needed as a parameter4 Usage:5 run_reliability_tests.py path/to/url/list6"""7import logging8import optparse9import os10import subprocess11import sys12import time13from Numeric import *14TEST_LIST_FILE = "/sdcard/android/reliability_tests_list.txt"15TEST_STATUS_FILE = "/sdcard/android/reliability_running_test.txt"16TEST_TIMEOUT_FILE = "/sdcard/android/reliability_timeout_test.txt"17TEST_LOAD_TIME_FILE = "/sdcard/android/reliability_load_time.txt"18HTTP_URL_FILE = "urllist_http"19HTTPS_URL_FILE = "urllist_https"20NUM_URLS = 2521def DumpRenderTreeFinished(adb_cmd):22 """Check if DumpRenderTree finished running.23 Args:24 adb_cmd: adb command string25 Returns:26 True if DumpRenderTree has finished, False otherwise27 """28 # pull test status file and look for "#DONE"29 shell_cmd_str = adb_cmd + " shell cat " + TEST_STATUS_FILE30 adb_output = subprocess.Popen(shell_cmd_str,31 shell=True, stdout=subprocess.PIPE,32 stderr=subprocess.PIPE).communicate()[0]33 return adb_output.strip() == "#DONE"34def RemoveDeviceFile(adb_cmd, file_name):35 shell_cmd_str = adb_cmd + " shell rm " + file_name36 subprocess.Popen(shell_cmd_str,37 shell=True, stdout=subprocess.PIPE,38 stderr=subprocess.PIPE).communicate()39def Bugreport(url, bugreport_dir, adb_cmd):40 """Pull a bugreport from the device."""41 bugreport_filename = "%s/reliability_bugreport_%d.txt" % (bugreport_dir,42 int(time.time()))43 # prepend the report with url44 handle = open(bugreport_filename, "w")45 handle.writelines("Bugreport for crash in url - %s\n\n" % url)46 handle.close()47 cmd = "%s bugreport >> %s" % (adb_cmd, bugreport_filename)48 os.system(cmd)49def ProcessPageLoadTime(raw_log):50 """Processes the raw page load time logged by test app."""51 log_handle = open(raw_log, "r")52 load_times = {}53 for line in log_handle:54 line = line.strip()55 pair = line.split("|")56 if len(pair) != 2:57 logging.info("Line has more than one '|': " + line)58 continue59 if pair[0] not in load_times:60 load_times[pair[0]] = []61 try:62 pair[1] = int(pair[1])63 except ValueError:64 logging.info("Lins has non-numeric load time: " + line)65 continue66 load_times[pair[0]].append(pair[1])67 log_handle.close()68 # rewrite the average time to file69 log_handle = open(raw_log, "w")70 for url, times in load_times.iteritems():71 # calculate std72 arr = array(times)73 avg = average(arr)74 d = arr - avg75 std = sqrt(sum(d * d) / len(arr))76 output = ("%-70s%-10d%-10d%-12.2f%-12.2f%s\n" %77 (url, min(arr), max(arr), avg, std,78 array2string(arr)))79 log_handle.write(output)80 log_handle.close()81def main(options, args):82 """Send the url list to device and start testing, restart if crashed."""83 # Set up logging format.84 log_level = logging.INFO85 if options.verbose:86 log_level = logging.DEBUG87 logging.basicConfig(level=log_level,88 format="%(message)s")89 # Include all tests if none are specified.90 if not args:91 print "Missing URL list file"92 sys.exit(1)93 else:94 path = args[0]95 if not options.crash_file:96 print "Missing crash file name, use --crash-file to specify"97 sys.exit(1)98 else:99 crashed_file = options.crash_file100 if not options.timeout_file:101 print "Missing timeout file, use --timeout-file to specify"102 sys.exit(1)103 else:104 timedout_file = options.timeout_file105 if not options.delay:106 manual_delay = 0107 else:108 manual_delay = options.delay109 if not options.bugreport:110 bugreport_dir = "."111 else:112 bugreport_dir = options.bugreport113 if not os.path.exists(bugreport_dir):114 os.makedirs(bugreport_dir)115 if not os.path.isdir(bugreport_dir):116 logging.error("Cannot create results dir: " + bugreport_dir)117 sys.exit(1)118 adb_cmd = "adb "119 if options.adb_options:120 adb_cmd += options.adb_options + " "121 # push url list to device122 test_cmd = adb_cmd + " push \"" + path + "\" \"" + TEST_LIST_FILE + "\""123 proc = subprocess.Popen(test_cmd, shell=True,124 stdout=subprocess.PIPE,125 stderr=subprocess.PIPE)126 (adb_output, adb_error) = proc.communicate()127 if proc.returncode != 0:128 logging.error("failed to push url list to device.")129 logging.error(adb_output)130 logging.error(adb_error)131 sys.exit(1)132 # clean up previous results133 RemoveDeviceFile(adb_cmd, TEST_STATUS_FILE)134 RemoveDeviceFile(adb_cmd, TEST_TIMEOUT_FILE)135 RemoveDeviceFile(adb_cmd, TEST_LOAD_TIME_FILE)136 logging.info("Running the test ...")137 # Count crashed tests.138 crashed_tests = []139 if options.time_out_ms:140 timeout_ms = options.time_out_ms141 # Run test until it's done142 test_cmd_prefix = adb_cmd + " shell am instrument"143 test_cmd_postfix = " -w com.android.dumprendertree/.LayoutTestsAutoRunner"144 # Call ReliabilityTestsAutoTest#startReliabilityTests145 test_cmd = (test_cmd_prefix + " -e class "146 "com.android.dumprendertree.ReliabilityTest#"147 "runReliabilityTest -e timeout %s -e delay %s" %148 (str(timeout_ms), str(manual_delay)))149 if options.logtime:150 test_cmd += " -e logtime true"151 test_cmd += test_cmd_postfix152 adb_output = subprocess.Popen(test_cmd, shell=True,153 stdout=subprocess.PIPE,154 stderr=subprocess.PIPE).communicate()[0]155 while not DumpRenderTreeFinished(adb_cmd):156 logging.error("DumpRenderTree exited before all URLs are visited.")157 shell_cmd_str = adb_cmd + " shell cat " + TEST_STATUS_FILE158 crashed_test = ""159 while not crashed_test:160 (crashed_test, err) = subprocess.Popen(161 shell_cmd_str, shell=True, stdout=subprocess.PIPE,162 stderr=subprocess.PIPE).communicate()163 crashed_test = crashed_test.strip()164 if not crashed_test:165 logging.error('Cannot get crashed test name, device offline?')166 logging.error('stderr: ' + err)167 logging.error('retrying in 10s...')168 time.sleep(10)169 logging.info(crashed_test + " CRASHED")170 crashed_tests.append(crashed_test)171 Bugreport(crashed_test, bugreport_dir, adb_cmd)172 logging.info("Resuming reliability test runner...")173 adb_output = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE,174 stderr=subprocess.PIPE).communicate()[0]175 if (adb_output.find("INSTRUMENTATION_FAILED") != -1 or176 adb_output.find("Process crashed.") != -1):177 logging.error("Error happened : " + adb_output)178 sys.exit(1)179 logging.info(adb_output)180 logging.info("Done\n")181 if crashed_tests:182 file_handle = open(crashed_file, "w")183 file_handle.writelines("\n".join(crashed_tests))184 logging.info("Crashed URL list stored in: " + crashed_file)185 file_handle.close()186 else:187 logging.info("No crash found.")188 # get timeout file from sdcard189 test_cmd = (adb_cmd + "pull \"" + TEST_TIMEOUT_FILE + "\" \""190 + timedout_file + "\"")191 subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE,192 stderr=subprocess.PIPE).communicate()193 if options.logtime:194 # get logged page load times from sdcard195 test_cmd = (adb_cmd + "pull \"" + TEST_LOAD_TIME_FILE + "\" \""196 + options.logtime + "\"")197 subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE,198 stderr=subprocess.PIPE).communicate()199 ProcessPageLoadTime(options.logtime)200if "__main__" == __name__:201 option_parser = optparse.OptionParser()202 option_parser.add_option("-t", "--time-out-ms",203 default=60000,204 help="set the timeout for each test")205 option_parser.add_option("-v", "--verbose", action="store_true",206 default=False,207 help="include debug-level logging")208 option_parser.add_option("-a", "--adb-options",209 default=None,210 help="pass options to adb, such as -d -e, etc")211 option_parser.add_option("-c", "--crash-file",212 default="reliability_crashed_sites.txt",213 help="the list of sites that cause browser to crash")214 option_parser.add_option("-f", "--timeout-file",215 default="reliability_timedout_sites.txt",216 help="the list of sites that timedout during test")217 option_parser.add_option("-d", "--delay",218 default=0,219 help="add a manual delay between pages (in ms)")220 option_parser.add_option("-b", "--bugreport",221 default=".",222 help="the directory to store bugreport for crashes")223 option_parser.add_option("-l", "--logtime",224 default=None,225 help="Logs page load time for each url to the file")226 opts, arguments = option_parser.parse_args()...

Full Screen

Full Screen

adb_logcat_monitor.py

Source:adb_logcat_monitor.py Github

copy

Full Screen

1#!/usr/bin/env python2#3# Copyright (c) 2012 The Chromium Authors. All rights reserved.4# Use of this source code is governed by a BSD-style license that can be5# found in the LICENSE file.6"""Saves logcats from all connected devices.7Usage: adb_logcat_monitor.py <base_dir> [<adb_binary_path>]8This script will repeatedly poll adb for new devices and save logcats9inside the <base_dir> directory, which it attempts to create. The10script will run until killed by an external signal. To test, run the11script in a shell and <Ctrl>-C it after a while. It should be12resilient across phone disconnects and reconnects and start the logcat13early enough to not miss anything.14"""15import logging16import os17import re18import shutil19import signal20import subprocess21import sys22import time23# Map from device_id -> (process, logcat_num)24devices = {}25class TimeoutException(Exception):26 """Exception used to signal a timeout."""27 pass28class SigtermError(Exception):29 """Exception used to catch a sigterm."""30 pass31def StartLogcatIfNecessary(device_id, adb_cmd, base_dir):32 """Spawns a adb logcat process if one is not currently running."""33 process, logcat_num = devices[device_id]34 if process:35 if process.poll() is None:36 # Logcat process is still happily running37 return38 else:39 logging.info('Logcat for device %s has died', device_id)40 error_filter = re.compile('- waiting for device -')41 for line in process.stderr:42 if not error_filter.match(line):43 logging.error(device_id + ': ' + line)44 logging.info('Starting logcat %d for device %s', logcat_num,45 device_id)46 logcat_filename = 'logcat_%s_%03d' % (device_id, logcat_num)47 logcat_file = open(os.path.join(base_dir, logcat_filename), 'w')48 process = subprocess.Popen([adb_cmd, '-s', device_id,49 'logcat', '-v', 'threadtime'],50 stdout=logcat_file,51 stderr=subprocess.PIPE)52 devices[device_id] = (process, logcat_num + 1)53def GetAttachedDevices(adb_cmd):54 """Gets the device list from adb.55 We use an alarm in this function to avoid deadlocking from an external56 dependency.57 Args:58 adb_cmd: binary to run adb59 Returns:60 list of devices or an empty list on timeout61 """62 signal.alarm(2)63 try:64 out, err = subprocess.Popen([adb_cmd, 'devices'],65 stdout=subprocess.PIPE,66 stderr=subprocess.PIPE).communicate()67 if err:68 logging.warning('adb device error %s', err.strip())69 return re.findall('^(\\S+)\tdevice$', out, re.MULTILINE)70 except TimeoutException:71 logging.warning('"adb devices" command timed out')72 return []73 except (IOError, OSError):74 logging.exception('Exception from "adb devices"')75 return []76 finally:77 signal.alarm(0)78def main(base_dir, adb_cmd='adb'):79 """Monitor adb forever. Expects a SIGINT (Ctrl-C) to kill."""80 # We create the directory to ensure 'run once' semantics81 if os.path.exists(base_dir):82 print 'adb_logcat_monitor: %s already exists? Cleaning' % base_dir83 shutil.rmtree(base_dir, ignore_errors=True)84 os.makedirs(base_dir)85 logging.basicConfig(filename=os.path.join(base_dir, 'eventlog'),86 level=logging.INFO,87 format='%(asctime)-2s %(levelname)-8s %(message)s')88 # Set up the alarm for calling 'adb devices'. This is to ensure89 # our script doesn't get stuck waiting for a process response90 def TimeoutHandler(_signum, _unused_frame):91 raise TimeoutException()92 signal.signal(signal.SIGALRM, TimeoutHandler)93 # Handle SIGTERMs to ensure clean shutdown94 def SigtermHandler(_signum, _unused_frame):95 raise SigtermError()96 signal.signal(signal.SIGTERM, SigtermHandler)97 logging.info('Started with pid %d', os.getpid())98 pid_file_path = os.path.join(base_dir, 'LOGCAT_MONITOR_PID')99 try:100 with open(pid_file_path, 'w') as f:101 f.write(str(os.getpid()))102 while True:103 for device_id in GetAttachedDevices(adb_cmd):104 if not device_id in devices:105 subprocess.call([adb_cmd, '-s', device_id, 'logcat', '-c'])106 devices[device_id] = (None, 0)107 for device in devices:108 # This will spawn logcat watchers for any device ever detected109 StartLogcatIfNecessary(device, adb_cmd, base_dir)110 time.sleep(5)111 except SigtermError:112 logging.info('Received SIGTERM, shutting down')113 except: # pylint: disable=bare-except114 logging.exception('Unexpected exception in main.')115 finally:116 for process, _ in devices.itervalues():117 if process:118 try:119 process.terminate()120 except OSError:121 pass122 os.remove(pid_file_path)123if __name__ == '__main__':124 if 2 <= len(sys.argv) <= 3:125 print 'adb_logcat_monitor: Initializing'126 sys.exit(main(*sys.argv[1:3]))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful